目的:在爬虫中使用异步实现高性能的数据爬取操作
版权声明:本文为CSDN博主「ThinkWon」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/ThinkWon/article/details/102021274
好处:可以为相关阻塞的操作单独开启进程或者线程,阻塞操作可以异步执行。
弊端:无法无限制的开启多线程或者多进程。
直接调用Thread
类
from threading import Thread # 线程类 def fun1(): for i in range(5): print(i, end=' ') def fun2(): for i in range(5, 10): print(i, end=' ') if __name__ == '__main__': t1 = Thread(target=fun1) t1.start() # 子线程执行 fun2() # 主线程执行
执行结果为:
05 6 7 8 9 1 2 3 4
通过类继承的方法来实现,直接继承Thread
类并且重写run
函数
from threading import Thread def fun1(): for i in range(5): print(i, end=' ') def fun2(): for i in range(5, 10): print(i, end=' ') # 继承Thread重新run函数 class MyThread(Thread): def run(self) -> None: fun2() if __name__ == '__main__': t = MyThread() t.start() # 开启线程开始运行 fun1()
执行结果:
50 6 71 2 8 3 4 9
一次性创建一些线程
,我们用户直接给线程池
提交任务,线程任务的调度交给线程池来完成
from concurrent.futures import ThreadPoolExecutor import time def fn(name): time.sleep(2) print(name) if __name__ == '__main__': # 创建线程池 startTime = time.time() with ThreadPoolExecutor(1000) as t: for i in range(1000): t.submit(fn, name=f'线程{i}') endTime = time.time() print(f'运行时间{endTime - startTime}') # 等待线程池中的任务全部执行完毕,才能继续执行下面的代码
import time from multiprocessing.dummy import Pool start_time = time.time() def fake_process(str): print("正在执行:", str) time.sleep(2) print('执行完成:', str) return str process_list = ['1', '2', '3', '4'] poop = Pool() str_list = poop.map(fake_process, process_list) end_time = time.time() print('耗时:', end_time-start_time) print('进程返回结果', str_list)
from multiprocessing import Process # 进程类 def fun1(arg): for i in range(10000): print(arg, i) def fun2(arg): for i in range(10000): print(arg, i) if __name__ == '__main__': p1 = Process(target=fun1, args=('进程1',)) p2 = Process(target=fun2, args=('进程2',)) p1.start() p2.start()
多进程和多线程如果需要传递参数,参数一定是一个元组,例如 args=('进程1',)
通过类继承的方法来实现,直接继承Process
类并且重写run
函数
from multiprocessing import Process # 进程类 class MyProcess(Process): def __init__(self, arg): super(MyProcess, self).__init__() self.arg = arg def run(self) -> None: for i in range(10000): print(self.arg, i) if __name__ == '__main__': p1 = MyProcess('进程1') p2 = MyProcess('进程2') p1.start() p2.start()
爬取 新发地-价格行情 (xinfadi.com.cn) 中所有信息
代码:
import requests from fake_useragent import UserAgent from tqdm import tqdm from concurrent.futures import ThreadPoolExecutor import time # 爬取一页数据 def download_one_page(data: dict): url = 'http://www.xinfadi.com.cn/getPriceData.html' headers = { 'user-agent': UserAgent().random, 'referer': 'http://www.xinfadi.com.cn/priceDetail.html' } resp = requests.post(url=url, headers=headers, data=data) data_list = resp.json().get('list') # 保存数据 with open('北京新发地.csv', 'a', encoding='utf-8') as fp: for elem in tqdm(data_list, desc=f'下载第 {data["current"]} 页数据 当前状告码:{resp.status_code}', ascii=True): info = (elem['prodCat'], elem['prodPcat'], elem['prodName'], elem['lowPrice'], elem['avgPrice'], elem['highPrice'], elem['specInfo'] , elem['place'], elem['unitInfo'], elem['pubDate']) fp.write(','.join(info) + '\n') def download_pages(page_start: int, page_end: int, page_limit: int = 20): fp = open('北京新发地.csv', 'w', encoding='utf-8') title = ['一级分类', '二级分类', '品名', '最低价', '平均价', '最高价', '规格', '产地', '单位', '发布日期'] fp.write(','.join(title) + '\n') fp.close() with ThreadPoolExecutor(2048) as t: for i in range(page_start, page_end + 1): data = { 'limit': f'{page_limit}', 'current': f'{i}', 'pubDateStartTime': '', 'pubDateEndTime': '', 'prodPcatid': '', 'prodCatid': '', 'prodName': '' } t.submit(download_one_page, data) if __name__ == '__main__': start_time = time.time() download_pages(page_start=1, page_end=100, page_limit=20) end_time = time.time() print(f'总耗时{end_time - start_time}s')
版权属于:瞌学家 所有,转载请注明出处
本文链接:https://songonline.top/archives/145/
友情提示: 如果文章部分链接出现404,请留言或者联系博主修复。