multiprocessing通过使用子进程而非线程有效的绕过了全局解释器锁。multiprocessing可以利用cpu的多核性能。multiprocessing的Api与threading类似
import multiprocessing import time def fun(i): print(f"process{i} start at {time.strftime('%X')}") if __name__ == "__main__": ctx = multiprocessing.get_context() p1 = ctx.Process(target=fun, args=(1,)) p2 = ctx.Process(target=fun, args=(2,)) p1.start() p2.start() p1.join() p2.join()
*class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, , daemon=None)
与threading.Thread的api类似。run()、start()、join()、name、is_alive()
multiprocessing的Queue类时queue.Queue的克隆,是一个线程安全的队列。put方法添加元素时如果队满会一直阻塞直到有空间放入元素。get方法获取元素时如果队空也会一直阻塞。
multiprocessing.Queue([maxsize])
import multiprocessing import random import time import random class Producer(multiprocessing.Process): def __init__(self, queue): super().__init__() self.queue = queue def run(self): for i in range(10): item = random.randint(0,256) self.queue.put(item) print(f"producer append {item} to queue") time.sleep(1) print(f"the size of queue is {self.queue.qsize()}") class Consumer(multiprocessing.Process): def __init__(self, queue): super().__init__() self.queue = queue def run(self): while True: if self.queue.empty(): print("queue is empty") break else: time.sleep(2) item = self.queue.get() print(f"Consumer get {item}") time.sleep(1) if __name__ =="__main__": queue = multiprocessing.Queue() producer = Producer(queue) consumer = Consumer(queue) producer.start() consumer.start() producer.join() consumer.join()
multiprocessing.Pipe([duplex])
返回一对Connection对象,(con1,con2),分别表示管道两端。duplex默认为True,表示可以双向通信。如果False为单向的con1只能接收消息,con2只能发送。
multiprocessing.connection.Connection
连接对象,允许发送可序列化对象。
import multiprocessing import time def send(left, right): left.send(['left', time.strftime("%X")]) print(left.recv()) def recv(left,right): right.send(['right', time.strftime("%X")]) print(right.recv()) if __name__ == '__main__': left,right = multiprocessing.Pipe() s_p = multiprocessing.Process(target=send, args=(left,right)) s_p.start() r_p = multiprocessing.Process(target=recv, args=(left,right)) r_p.start() s_p.join() r_p.join()
返回从共享内存上创建的ctypes对象,默认情况下返回的对象实际上是经过了同步器包装过的,可以通过value属性访问对象本身。
from multiprocessing import Process, Value def f(v): with v.get_lock(): # += 类操作不具有原子性,使用对象内部关联锁 v.value+=1 if __name__ == "__main__": v = Value('i',0) p1 = Process(target=f, args=(v,)) p2 = Process(target=f, args=(v,)) p1.start() p2.start() p1.join() p2.join() print(v.value)
从共享内存申请并返回一个具有ctypes类型的数组对象,默认情况下返回值实际上是被同步器包装过的数组对象。
from multiprocessing import Process, Array def f(arr, i): arr[i]=i if __name__ =="__main__": arr = Array('i', 10) processes = [] for i in range(10): process = Process(target=f,args=(arr,i)) processes.append(process) for p in processes: p.start() for p in processes: p.join() print(arr[:])
管理器维护一个用于管理共享对象的服务,其他进程可以通过代理访问这些共享对象。
multiprocessing.Manager()返回一个已启动的SyncManager管理器对象,可以用于在不同进程中共享数据。
支持 list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array 。
from multiprocessing import Manager , Process def f(mylist, i): mylist.append(i) if __name__ =="__main__": manager = Manager() mylist = manager.list() processes = [] for i in range(10): p = Process(target=f, args=(mylist, i)) processes.append(p) for p in processes: p.start() for p in processes: p.join() print(mylist)
multiprocessing.Pool([processes[,initalizer[,initargs[,maxtaskperchild[,[context]]]]])
返回一个进程池对象,它控制可以提交作业的工作进程池,支持超时和回调的异步结果以及并行的map。
- processes进程数,如果为None,则使用os.cup_count()返回的值
- 如果initalizer不为None,则每个工作进程将会在启动时调用initalizer(*initargs)
multiprocessing.pool.AsyncResult
Pool.apply_async和pool.map_async()返回的对象所属的类。
from multiprocessing import Pool import time def f(x): time.sleep(1) return x**x def mf(x): time.sleep(0.5) return x*2 def initializer(*args): print(args, time.strftime("%X")) if __name__ =="__main__": with Pool(processes=4, initializer=initializer,initargs=("init-",)) as pool: print(f"apply - start {time.strftime('%X')}") print(pool.apply(f,(10,))) # 阻塞直到运行完成 print(f"apply - end{time.strftime('%X')}") print(f"apply_async - start {time.strftime('%X')}") result = pool.apply_async(f,(10,)) # 异步执行不阻塞当前进程 print(f"apply_async - end{time.strftime('%X')}") print(result.get()) print(f"map - start {time.strftime('%X')}") print(pool.map(mf,[i for i in range(10)])) # 阻塞直到运行完成 print(f"map- end{time.strftime('%X')}") print(f"map_async - start {time.strftime('%X')}") result = pool.map_async(mf,[i for i in range(10)]) # 异步执行不阻塞当前进程 print(f"mapy_async - end{time.strftime('%X')}") print(result.get())