原文链接:https://docs.python.org/zh-cn/3/library/multiprocessing.html
其实原文就是中文的,这篇文章甚至谈不上翻译,只是照抄一遍罢了。scrapy用多了,回顾一下requests加多进程如何使用。
有一批固定的代理,假设100个。有一批起始url5000个,每个起始url可以获取到若干个详情url。使用多进程获取所有详情页的内容
想法1:最直接的方法就是创建100个进程,每个进程传入一个代理ip,在创建一个url队列,这100个进程一直在消费队列和往队列存放新的url。当某个进程报错(网络之类的错误)达到一定次数则结束该进程
想法2:能不能使用进程池来操作。但是有个问题,如何给进程池内的每个进程分配一个固定的代理呢?固定虽然做不到,但可以把代理也放入一个队列,获取url的时候从队列中取出一个代理,用完再放回队列。
上面两个想法应该都可以实现。但是我的这100个代理可能都是静态代理,质量比较好,我想把它们压榨干净。所以我一直在纠结能不能给进程池的进程固定分配一个代理。
为什么进程池就不能先分配一个资源呢?一堆工人就不能只用一个顺手的工具来做任务,工具坏了才换一个,我感觉这才合理。于是就只能看官网文档有没有这种方法(代码见最后,如果你有更好的想法或者觉得我的代码可以再优化欢迎给出建议)
from multiprocessing import Process def f(name): print('hello', name) if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() p.join()
from multiprocessing import Pool def f(x): return x*x if __name__ == '__main__': with Pool(5) as p: print(p.map(f, [1, 2, 3]))
from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) print('process id:', os.getpid()) def f(name): info('function f') print('hello', name) if __name__ == '__main__': info('main line') p = Process(target=f, args=('bob',)) p.start() p.join()
multiprocessing.Queue接口基本同queue.Queue一样,只是缺少了task_done()和join()两个方法
当一个对象被放入队列时,会先被后台线程用pickle序列化,再将序列化的数据通过管道传递给队列。将一个对象放入空队列时,需要一个极小的延迟才能使empty
方法返回False,推荐使用get_nowait
方法
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
警告:如果一个进程在使用队列期间,被terminate或者kill方法终止了,该队列的数据很可能已经损坏,其他进程操作该队列会触发异常。
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()
注意:不能在不同进程同时读写管道的同一端(比如多个进程同时操作parent_conn),这会导致管道内的数据异常。如果每个进程只读写某一端的数据,则不会有问题。也就是说返回的parent_conn, child_conn最好分配给两个进程分别使用
多进程时应尽量避免锁的存在,可以使用管道或队列来传输数据
from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:])
from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l)
Manager()支持的所有类型: list 、 dict 、Namespace 、Lock 、RLock 、Semaphore 、BoundedSemaphore 、 Condition 、Event 、Barrier 、Queue 、Value 和 Array
自定义数据类型
from multiprocessing.managers import BaseManager class MathsClass: def add(self, x, y): return x + y def mul(self, x, y): return x * y class MyManager(BaseManager): pass MyManager.register('Maths', MathsClass) if __name__ == '__main__': with MyManager() as manager: maths = manager.Maths() print(maths.add(4, 3)) # prints 7 print(maths.mul(7, 8)) # prints 56
建议多使用队列和管道来传输数据,尽可能的避免使用共享内存和Manager,当然,如果你非要用也行。
创建服务端
from multiprocessing.managers import BaseManager from queue import Queue queue = Queue() class QueueManager(BaseManager): pass QueueManager.register('get_queue', callable=lambda:queue) m = QueueManager(address=('', 50000), authkey=b'abracadabra') s = m.get_server() s.serve_forever()
客户端1
往远程队列中存放’hello’
from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass QueueManager.register('get_queue') m = QueueManager(address=('127.0.0.1', 50000), authkey=b'abracadabra') m.connect() queue = m.get_queue() queue.put('hello')
客户端2
取出远程队列的数据。这样就实现了两个远程进程间的数据交互
from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass QueueManager.register('get_queue') m = QueueManager(address=('127.0.0.1', 50000), authkey=b'abracadabra') m.connect() queue = m.get_queue() print(queue.get()) # 'hello'
服务端也做客户端
开始创建一个进程往队列里存放数据,然后开启远程服务器。之后用客户端2就可以获取到队列里进程存放的数据了
from multiprocessing import Process, Queue from multiprocessing.managers import BaseManager class Worker(Process): def __init__(self, q): self.q = q super().__init__() def run(self): self.q.put('local hello') class QueueManager(BaseManager): pass if __name__ == '__main__': queue = Queue() w = Worker(queue) w.start() QueueManager.register('get_queue', callable=lambda: queue) m = QueueManager(address=('', 50000), authkey=b'abracadabra') s = m.get_server() s.serve_forever()
manager创建的列表对象和列表基本是一样的,允许相互嵌套使用。当然你可以嵌套其他数据类型,比如字典或者manager.dict()
>>> a = manager.list() >>> b = manager.list() >>> a.append(b) # referent of a now contains referent of b >>> print(a, b) [<ListProxy object, typeid 'list' at ...>] [] >>> b.append('hello') >>> print(a[0], b) ['hello'] ['hello']
注意:嵌套的可变对象(列表、字典)里的数据不会自动同步到远程。需要在更改后重新赋值给manager
lproxy = manager.list() lproxy.append({}) d = lproxy[0] d['a'] = 1 d['b'] = 2 # 重新赋值触发远程同步 lproxy[0] = d
processes是进程数,默认为 os.cpu_count()的值。如果 initializer 不为 None,则每个工作进程将会在启动时调用 initializer(*initargs)。maxtasksperchild表示寿命(处理多少个任务就自己挂了),默认为None,和pool对象同寿命。context这个没什么软用,不用管。
注意,进程池对象的方法只有创建它的进程能够调用。也就是说无法通过参数传递给其他进程
警告:不要期望垃圾回收机制来回收pool对象,应该手动调用close
或terminate
。建议使用with
下面是pool对象的一些方法:
math.ceil(len(iterable)/len(processes))
。如果任务很大,建议使用imap或者imap_unordered并指定chunksizepool对象中包含_async方法返回的对象,比如pool.apply_async()
import time import os from multiprocessing import Pool, TimeoutError def f(x, y=666): return x*y if __name__ == '__main__': with Pool(processes=4) as pool: # 添加一个任务 res = pool.apply_async(f, (20,)) # 获取执行结果,如果一秒后进程还没有返回结果则抛出TimeoutError异常 print(res.get(timeout=1)) # 捕获 res = pool.apply_async(time.sleep, (5,)) try: print(res.get(timeout=1)) except TimeoutError: print("获取结果超时,可能是函数没有执行完!") # 添加多个任务 multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)] print([res.get() for res in multiple_results]) # map添加最为方便,功能同上 print("map: ", pool.map(f, range(10))) # 和map一样,但会延迟添加任务,类似的函数还有imap。 # 一般用于大任务量时,配合第三个参数使用更佳(每次添加到池中的块大小,默认1) print("imap: ", list(pool.imap(f, range(10)))) # map和imap都会保证结果的顺序(对应range(4)),而imap_unordered则不会 print("imap_unordered: ", list(pool.imap_unordered(f, range(10)))) # 和map一样,传多个参数时建议使用 print("starmap: ", pool.starmap(f, [(1,1),(2,2),(3,3)])) # 前面四个map都是有不阻塞的版本的,以map为例 pool.map_async(f, range(10), callback=lambda x: print("map_async: ", x)) # 不再添加任务 pool.close() # 等待所有任务执行完成 pool.join()
注意:使用进程池时代码必须加上if name == ‘main’:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()
第二种
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
其实主要是创建Pool的时候的两个参数initializer和initargs。进程会在创建的时候执行一次initializer(*initargs)。另外,想要在其他函数内使用该函数的变量,需要指定global关键字
import requests import time import random import os import multiprocessing # 代理列表,代理格式:"http://user:passwd@ip:port" ips2 = [] # 工作进程(进程池的进程)的任务函数 def get(a, q): global ip # 可以看到进程id和ip是一一对应的 print(os.getpid(), os.getppid(), ip) try: return requests.get('http://www.httpbin.org/get', params={'a':a}, proxies={"http":ip,"https":ip},timeout=5).text # except Exception as e: # 报错直接换个代理,只是演示,实际肯定需要报错多次才换 ip = q.get() print(e, ip) return # 工作进程初始化执行的函数 def get_ip(q): global ip ip = q.get() if __name__ == "__main__": manage = multiprocessing.Manager() # 进程池需要使用manage.Queue而不是multiprocessing.Queue q = manage.Queue(len(ips2)) for i in ips2: q.put(i) print(q.qsize()) pool = multiprocessing.Pool(processes = 4, initializer=get_ip, initargs=(q,)) result = pool.starmap(get,[(i, q) for i in range(10)]) for i in result: print(i) print(q.qsize())
实际上,所有的工作进程都可以直接访问到ips这个列表。但是应该如果分配给这些工作进程,它们既没有顺序也没有名字,唯一的标识就是pid,拿pid映射到列表的某个索引可不可以呢。估计要解决这个问题,只能看进程池实现的代码了。但是官方的源码不是一般人能看的懂的,我肯定是不太行。
上面只演示了如果给进程池的每个进程分配一个固定的ip,并没有更新任务到队列。实际上,对于列表页和详情页的处理会很不一样,一般需要两个不同的函数来处理,更新任务到队列的处理可能会比较麻烦。
以崔佬的练习平台为例:https://ssr1.scrape.center/。获取前5页的的所有电影中的导演。
完整代码如下:
import requests import time import random import os import multiprocessing from lxml import etree from queue import Empty ips2 = [] def get_index(page, q): global ip if not ip: return print("进程ID:", os.getpid(), ",开始采集列表页, 页码:", page, ",使用ip: ", ip) try: resp = requests.get(f'https://ssr1.scrape.center/page/{page}', proxies={"http":ip,"https":ip}, timeout=5) except Exception as e: pid = os.getpid() print(f"进程({pid})出现异常, 使用ip:{ip}") print(e) try: ip = q.get_nowait() except Empty: pass raise Exception(str(page)) else: html = resp.text doc = etree.HTML(html) detail_urls = doc.xpath('//a[@class="name"]/@href') detail_urls = ["https://ssr1.scrape.center" + url for url in detail_urls if not url.startswith('http')] time.sleep(1.5) return detail_urls def get_detail(url, q): global ip if not ip: return print("进程ID:", os.getpid(), ",开始采集详情页, url:", url, ",使用ip: ", ip) try: resp = requests.get(url, proxies={"http":ip,"https":ip}, timeout=5) except Exception as e: pid = os.getpid() print(f"进程({pid})出现异常, 使用ip:{ip}") print(e) try: ip = q.get_nowait() except Empty: pass raise Exception(url) else: html = resp.text doc = etree.HTML(html) results = doc.xpath('//div[@class="directors el-row"]//p/text()') directors = '|'.join([i.strip() for i in results]) time.sleep(1.5) return url + ',' + directors def save_result(results): print("获取的导演结果:", results) def get_ip(q): global ip try: ip = q.get_nowait() except Empty: ip = '' print('代理列表为空, 请添加代理后再开始采集!') def index_callback(detail_urls): if not detail_urls: return for url in detail_urls: pool.apply_async(get_detail, args=(url, q), callback=save_result, error_callback=detail_error_callback) def index_error_callback(e): pool.apply_async(get_index, (str(e), q), callback=index_callback, error_callback=index_error_callback) def detail_error_callback(e): pool.apply_async(get_detail, (str(e), q), callback=save_result, error_callback=detail_error_callback) if __name__ == "__main__": manage = multiprocessing.Manager() q = manage.Queue(len(ips2)) for i in ips2: q.put(i) t = time.time() pool = multiprocessing.Pool(processes = 20, initializer=get_ip, initargs=(q,)) for i in range(5): pool.apply_async(get_index, args=(i, q), callback=index_callback, error_callback=index_error_callback) while(pool._cache): time.sleep(0.1) pool.close() pool.join() print('采集完成,耗时:', time.time() - t)
这个代码利用回调函数来不断往进程池加入新任务,如果代码出现问题,可能会出现死循环的情况。所以运行之前注意理清逻辑。另外,回调函数是在主进程中执行的,如果包含阻塞的代码,则后面的回调函数会一直在等待状态。所以不要在回调函数内干耗时的操作,可以将这些操作写在进程函数内。
ips2列表中的代理需大于进程数,不然会出现采集数据为None的情况。如果进程没有拿到代理则不给他分配任务,这个想要实现比较麻烦。暂时没有想要一个好的解决方法。
如果代理只是比进程数少了一点点,对于没有拿到代理的进程拿到任务时,可以通过抛出异常让代理池重新分配给进程(见error_callback两个函数的代码)。这可能会导致死循环,因为代理池可能又把任务分配给了没有拿到代理的进程,又会将任务抛出。所以这种方法只针对代理量相对进程来说只是少了一点点的情况。
如何判断进程池的任务是否执行完了,这个官方文档中没有提到相关的接口,但是我通过查看源码发现了几个属性可以进行判断,不知道是不是最佳的方法,有清楚的还请不吝赐教。