from multiprocessing import Process def f(name): print('hello', name) if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() p.join()
from multiprocessing import Pool def f(x): return x*x if __name__ == '__main__': with Pool(5) as p: print(p.map(f, [1, 2, 3]))
from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) print('process id:', os.getpid()) def f(name): info('function f') print('hello', name) if __name__ == '__main__': info('main line') p = Process(target=f, args=('bob',)) p.start() p.join()
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()
注意:不能在不同进程同时读写管道的同一端(比如多个进程同时操作parent_conn),这会导致管道内的数据异常。如果每个进程只读写某一端的数据,则不会有问题。也就是说返回的parent_conn, child_conn最好分配给两个进程分别使用
from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:])
from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l)
Manager()支持的所有类型: list 、 dict 、Namespace 、Lock 、RLock 、Semaphore 、BoundedSemaphore 、 Condition 、Event 、Barrier 、Queue 、Value 和 Array
from multiprocessing.managers import BaseManager class MathsClass: def add(self, x, y): return x + y def mul(self, x, y): return x * y class MyManager(BaseManager): pass MyManager.register('Maths', MathsClass) if __name__ == '__main__': with MyManager() as manager: maths = manager.Maths() print(maths.add(4, 3)) # prints 7 print(maths.mul(7, 8)) # prints 56
from multiprocessing.managers import BaseManager from queue import Queue queue = Queue() class QueueManager(BaseManager): pass QueueManager.register('get_queue', callable=lambda:queue) m = QueueManager(address=('', 50000), authkey=b'abracadabra') s = m.get_server() s.serve_forever()
from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass QueueManager.register('get_queue') m = QueueManager(address=('', 50000), authkey=b'abracadabra') m.connect() queue = m.get_queue() queue.put('hello')
from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass QueueManager.register('get_queue') m = QueueManager(address=('', 50000), authkey=b'abracadabra') m.connect() queue = m.get_queue() print(queue.get()) # 'hello'
from multiprocessing import Process, Queue from multiprocessing.managers import BaseManager class Worker(Process): def __init__(self, q): self.q = q super().__init__() def run(self): self.q.put('local hello') class QueueManager(BaseManager): pass if __name__ == '__main__': queue = Queue() w = Worker(queue) w.start() QueueManager.register('get_queue', callable=lambda: queue) m = QueueManager(address=('', 50000), authkey=b'abracadabra') s = m.get_server() s.serve_forever()
>>> a = manager.list() >>> b = manager.list() >>> a.append(b) # referent of a now contains referent of b >>> print(a, b) [<ListProxy object, typeid 'list' at ...>] [] >>> b.append('hello') >>> print(a[0], b) ['hello'] ['hello']
lproxy = manager.list() lproxy.append({}) d = lproxy[0] d['a'] = 1 d['b'] = 2 # 重新赋值触发远程同步 lproxy[0] = d
processes是进程数,默认为 os.cpu_count()的值。如果 initializer 不为 None,则每个工作进程将会在启动时调用 initializer(*initargs)。maxtasksperchild表示寿命(处理多少个任务就自己挂了),默认为None,和pool对象同寿命。context这个没什么软用,不用管。
import time import os from multiprocessing import Pool, TimeoutError def f(x, y=666): return x*y if __name__ == '__main__': with Pool(processes=4) as pool: # 添加一个任务 res = pool.apply_async(f, (20,)) # 获取执行结果,如果一秒后进程还没有返回结果则抛出TimeoutError异常 print(res.get(timeout=1)) # 捕获 res = pool.apply_async(time.sleep, (5,)) try: print(res.get(timeout=1)) except TimeoutError: print("获取结果超时,可能是函数没有执行完!") # 添加多个任务 multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)] print([res.get() for res in multiple_results]) # map添加最为方便,功能同上 print("map: ", pool.map(f, range(10))) # 和map一样,但会延迟添加任务,类似的函数还有imap。 # 一般用于大任务量时,配合第三个参数使用更佳(每次添加到池中的块大小,默认1) print("imap: ", list(pool.imap(f, range(10)))) # map和imap都会保证结果的顺序(对应range(4)),而imap_unordered则不会 print("imap_unordered: ", list(pool.imap_unordered(f, range(10)))) # 和map一样,传多个参数时建议使用 print("starmap: ", pool.starmap(f, [(1,1),(2,2),(3,3)])) # 前面四个map都是有不阻塞的版本的,以map为例 pool.map_async(f, range(10), callback=lambda x: print("map_async: ", x)) # 不再添加任务 pool.close() # 等待所有任务执行完成 pool.join()
注意:使用进程池时代码必须加上if name == ‘main’:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
import requests import time import random import os import multiprocessing # 代理列表,代理格式:"http://user:passwd@ip:port" ips2 = [] # 工作进程(进程池的进程)的任务函数 def get(a, q): global ip # 可以看到进程id和ip是一一对应的 print(os.getpid(), os.getppid(), ip) try: return requests.get('http://www.httpbin.org/get', params={'a':a}, proxies={"http":ip,"https":ip},timeout=5).text # except Exception as e: # 报错直接换个代理,只是演示,实际肯定需要报错多次才换 ip = q.get() print(e, ip) return # 工作进程初始化执行的函数 def get_ip(q): global ip ip = q.get() if __name__ == "__main__": manage = multiprocessing.Manager() # 进程池需要使用manage.Queue而不是multiprocessing.Queue q = manage.Queue(len(ips2)) for i in ips2: q.put(i) print(q.qsize()) pool = multiprocessing.Pool(processes = 4, initializer=get_ip, initargs=(q,)) result = pool.starmap(get,[(i, q) for i in range(10)]) for i in result: print(i) print(q.qsize())
import requests import time import random import os import multiprocessing from lxml import etree from queue import Empty ips2 = [] def get_index(page, q): global ip if not ip: return print("进程ID:", os.getpid(), ",开始采集列表页, 页码:", page, ",使用ip: ", ip) try: resp = requests.get(f'https://ssr1.scrape.center/page/{page}', proxies={"http":ip,"https":ip}, timeout=5) except Exception as e: pid = os.getpid() print(f"进程({pid})出现异常, 使用ip:{ip}") print(e) try: ip = q.get_nowait() except Empty: pass raise Exception(str(page)) else: html = resp.text doc = etree.HTML(html) detail_urls = doc.xpath('//a[@class="name"]/@href') detail_urls = ["https://ssr1.scrape.center" + url for url in detail_urls if not url.startswith('http')] time.sleep(1.5) return detail_urls def get_detail(url, q): global ip if not ip: return print("进程ID:", os.getpid(), ",开始采集详情页, url:", url, ",使用ip: ", ip) try: resp = requests.get(url, proxies={"http":ip,"https":ip}, timeout=5) except Exception as e: pid = os.getpid() print(f"进程({pid})出现异常, 使用ip:{ip}") print(e) try: ip = q.get_nowait() except Empty: pass raise Exception(url) else: html = resp.text doc = etree.HTML(html) results = doc.xpath('//div[@class="directors el-row"]//p/text()') directors = '|'.join([i.strip() for i in results]) time.sleep(1.5) return url + ',' + directors def save_result(results): print("获取的导演结果:", results) def get_ip(q): global ip try: ip = q.get_nowait() except Empty: ip = '' print('代理列表为空, 请添加代理后再开始采集!') def index_callback(detail_urls): if not detail_urls: return for url in detail_urls: pool.apply_async(get_detail, args=(url, q), callback=save_result, error_callback=detail_error_callback) def index_error_callback(e): pool.apply_async(get_index, (str(e), q), callback=index_callback, error_callback=index_error_callback) def detail_error_callback(e): pool.apply_async(get_detail, (str(e), q), callback=save_result, error_callback=detail_error_callback) if __name__ == "__main__": manage = multiprocessing.Manager() q = manage.Queue(len(ips2)) for i in ips2: q.put(i) t = time.time() pool = multiprocessing.Pool(processes = 20, initializer=get_ip, initargs=(q,)) for i in range(5): pool.apply_async(get_index, args=(i, q), callback=index_callback, error_callback=index_error_callback) while(pool._cache): time.sleep(0.1) pool.close() pool.join() print('采集完成,耗时:', time.time() - t)