前言:文章仅用于学习交流,不足之处欢迎小伙伴指正!
已实现功能:
1、爬取搜狗微信上的分类一栏的所有事件及其他的所有标题事件和加载更多,返回文章链接与标题,并存入数据库中,后续可直接根据链接下载文章。
2、根据输入内容定向爬取文章,返回链接与标题。
待实现功能:
1、根据数据库中的链接爬取公众号的所有相关文章,保存于数据库,并对所有文章分类存档。
2、实现UI界面(PyQt5),根据需要对程序打包为可执行文件。
selenisum(实现简单的搜狗微信主页源码获取)
代理池原理 (给定ip源,简单的判断ip是否可用)
线程池原理 (实现多线程爬取,增加爬取效率)
oo面向思想 (面向对象编程,分模块编程,增加可读性,方便后期维护)
一些常用的库 (os、time、random、requests、pymysql、queue、threading)
1、main.py:该文件为程序的入口,即启动文件,功能:几个模块之间的数据传输的中转站。
2、ip_pool.py: 该文件是ip代理池文件, 功能:将给定的ip列表进行筛选,选出可用的ip方便后续请求。
2、spider_page.py: 该文件用于提取网页源码, 功能:selenisum去获取主页源码。
3、settings.py:该文件为配置文件, 功能:我们在程序启动之前可以选择爬虫的爬取方向,当然这个方向是由自己给定的。
4、is_API_Run.py: 该文件是API执行文件,功能:根据settings文件的配置输入,从而决定执行那些API函数。
5、API_Function.py: 该文件是API文件,功能:实现了各个API函数的功能实现。
6、thread_pool.py:该文件为线程池文件,功能:实现了从数据库中获取链接,去多线程的下载文章。
1、在我们开始之前,让我们先了解下搜狗微信的主页面:https://weixin.sogou.com/ 若已了解则跳过这一步。
2、文章这里以主页中的 “搜索热词”、“编辑精选”、“热点文章”、“热门”举例演示。
3、从main文件的程序入口启动程序,首先该文件导入了一些包和编写的其他模块文件,并编写了相应的两个函数,分别是panding()函数、ip()函数,对应功能见代码注释,其他的一些模块文件在后边会有解释
文件名:main.py
''' 启动程序 ''' #导包和导入自定义模块文件 #自己编写的模块文件 from spider_page import __init__source from is_API_Run import read_settings_run_func from thread_pool import Open_Thread_Assign_Links from ip_pool import ip_Pool #python包 import os import time import random def ip(IP_POOL): ''' 从IP_POOL(ip池)随机选出一个ip将其返回给主程序使用, :param IP_POOL: IP池 :return: proxies ------随机的一个代理ip ''' proxies = { "http": random.choice(IP_POOL), } return proxies def panding(proxies): '''在此处实现:判断工作文件路径下的page_source目录下是否有名为当日的文件(20210511.txt)--年月日的txt文件, 判断文件名是否符合要求(一天之内生成的) 若有则读取出来赋值page,返回给主程序使用,若没有则执行page=__init__source().html_source语句。 获取网页源代码,再将其命名保存,并且返回给主程序使用''' work_path = os.getcwd() # 获取工作路径 html_path = os.path.join(work_path, 'page_source') if len(os.listdir(html_path)) > 0: # 判断路径下是否有文件 flag = False for x in os.listdir(html_path): # 罗列出路径下所有的文件 print("x:", x) if x == (time.strftime(r'%Y%m%d') + r'.txt'): # 找出当天生成的文件 print("x:", x) with open(os.path.join(html_path, x), 'r', encoding="utf-8") as f: page = f.read() return page # 没有当天的文件就创建文件并写入html源码 page = __init__source(proxies).html_source with open(os.path.join(html_path, time.strftime(r'%Y%m%d') + r'.txt'), 'w', encoding="utf-8") as f: f.write(page) else: page = __init__source(proxies).html_source with open(os.path.join(html_path, time.strftime(r'%Y%m%d') + r'.txt'), 'w', encoding="utf-8") as f: f.write(page) return page #将源码返回给主程序使用 if __name__ == '__main__': IP_POOL = ip_Pool() # 启动ip代理池 (初始化ip类,并生成代理池对象,接下来从对象中获取ip池---IP_POOL.ip_pool) proxies = ip(IP_POOL.ip_pool) # 随机从ip池中拿出一个代理ip---proxies page = panding(proxies) # 初始化源码 (用代理ip请求主页源码,并保存于程序工作目录下的page_source文件夹下的以年月日为名的txt文件。) read_settings_run_func(page, proxies) # 读取配置文件并执行相应的API函数。 Open_Thread_Assign_Links(proxies) # 开启线程池下载公众号文章
4、然后解释一下ip代理池这个模块文件,该文件实现了动态的从网上的ip源获取可用ip,并初始化IP代理池为5个ip,然后会在主程序运行过程中一直对ip池中的ip进行可用性验证,若ip过期不可用,那将该ip从ip代理池中删除,再从ip源获取可用ip补充ip代理池。具体功能实现见代码。该文件模块可以拿出来单独使用,并不局限于单独的某个程序。单独使用方法,后面会有具体说明。
文件名:ip_pool.py
# 导包 import requests import time import re import threading # 定义测试的url链接,这里暂且选用www.baidu.com url = 'https://www.baidu.com/' # 定义测试的url链接www.baidu.com的请求头。 headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Cache-Control': 'max-age=0', 'Connection': 'keep-alive', 'Cookie': 'BIDUPSID=EAAEC44F956EC0051F3EB986A600267F; PSTM=1618841564; BD_UPN=12314753; __yjs_duid=1_fc17df5ce48c903e96c35412849fa9c21618841573172; BDUSS=F2Yn5VfmNzMFkwNjE3TzNKY0V3UVJUUW0wOUFDTVVEdU82cG9Id1lmSzEwSzFnSUFBQUFBJCQAAAAAAAAAAAEAAADrnKS8vsXOsr3wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAALVDhmC1Q4ZgY0; BDUSS_BFESS=F2Yn5VfmNzMFkwNjE3TzNKY0V3UVJUUW0wOUFDTVVEdU82cG9Id1lmSzEwSzFnSUFBQUFBJCQAAAAAAAAAAAEAAADrnKS8vsXOsr3wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAALVDhmC1Q4ZgY0; BAIDUID=FAD97B32882628A65DB481B25993EEA8', 'Host': 'www.baidu.com', 'Referer': 'https', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'cross-site', 'Sec-Fetch-User': '?1', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36 SLBrowser/7.0.0.4071 SLBChan/15'} # 对url网站发起请求,根据返回体的状态码判断是否请求成功,若成功则该ip可用,若不成功则ip不可用,将ip从ip池中移除。 # 那么这个ip池要保证在整个程序运行用ip皆可用 # ip供应源(ip_source) ip_source = 'ip源' # 初始化ip池为空(Ip_Pool) Ip_Pool = [] # ip的预期存留时间M M = 0 # ip代理池类 class ip_Pool: def __init__(self, ): self.ip_pool = [] # 初始化ip池属性,方便后续的ip池赋值。 threading.Thread(target=self.return_ip_pool).start() # 开启子线程动态处理ip代理池,目的:保证ip池中ip的可用性的同时,又不会影响其他程序运行。所以开辟一个子线程。 time.sleep(6) # 加这个等待时间,目的:给子线程足够的时间去获取ip,并填充ip池,保证返回给主程序的ip池不为空。 def ip_yield(self): while True: while len(Ip_Pool) < 5: # 这里的while循环语句 目的:用于检测ip池中是否始终满足个数要求 ip = requests.get(ip_source).text # 从ip源获取ip文本。 if '{' in ip: # 检测从ip源是否成功获取到ip。若未获取到ip则会返回一个包含字典样式的字符串。 continue # time.sleep(0.5) # 可以增加等待时间,以提高获取到ip的成功率 else: ip = re.match('\S*', ip).group() # 对获取到的ip文本简单处理,目的去除\r\n字符。 Ip_Pool.append(ip) # 将ip添加到ip池中。 # 暂停M分钟,并将Ip_Pool返回出去使用 yield Ip_Pool time.sleep(M) print('将IP池返回给主程序使用------' + str(M) + '半分钟后对IP池进行检测是否可用--------') #以下for循环是用于检测ip池中ip是否过期。 for x in Ip_Pool: proxies = {'http': x} try: if requests.get(url, proxies=proxies, headers=headers).status_code == 200: # print('该ip正常,还能继续使用半分钟') pass except Exception: print('存在一个ip过期,即将从IP池中去除该ip') Ip_Pool.remove(x) def return_ip_pool(self): print("ip池线程启动") for self.ip_pool in self.ip_yield(): pass
5、若需要使用该ip池模块,则需要将文件copy于使用的文件目录下,然后导入文件模块,再创建ip代理类对象,用对象获取ip池(ip_pool)。
6、咱们来回顾下,主程序启动ip代理池后得到ip池,使用ip()方法将ip池处理得到一个随机ip,将ip放入panding方法中请求得到主页源码,再调用is_API_Run.py文件模块下的read_settings_run_func类,继而传入源码创建类对象,在这个文件模块中导入API模块和配置模块,具体功能:根据配置模块创建配置类对象,从而获取到配置信息。然后由API模块创建API类对象,根据配置信息执行相应的方法。以下文件中的打印内容都可以去掉,换成pass占位即可
文件名:is_API_Run.py
''' 获取配置文件settings的输入给与相应的函数输出 ''' from API_Function import Functions from settings import configure class read_settings_run_func(): def __init__(self, page, proxies): self.proxies = proxies self.page = page self.set = configure() self.func = Functions(self.page, self.proxies) self.test_settings() def test_settings(self): if self.set.Search_hot_words == True: self.func.Search_hot_words() else: print("mei") if self.set.Editor_s_selection == True: self.func.Editor_s_selection() else: print("mei") if self.set.Hot_articles == True: self.func.Hot_articles() else: print("mei") if self.set.Custom_hotspot_source_Text == True: self.func.Custom_hotspot_source_Text(self.set.text_list) else: print("mei") if self.set.Custom_hotspot_source_Url == True: self.func.Custom_hotspot_source_Url(self.set.Custom_hotspot_source_Url_List) else: print("mei") if self.set.Hot_event_sources_of_other_websites == True: self.func.Hot_event_sources_of_other_websites(self.set.Hot_event_sources_of_other_websites_List) else: print("mei") if self.set.input_text == True: self.func.input_text(self.set.text) else: print("mei") if self.set.Crawling_article == True: self.func.Crawling_article(self.set.text) else: print("mei") if self.set.Crawling_official_account == True: self.func.Crawling_official_account() else: print("mei") if self.set.Hot == True: self.func.Hot() else: print("mei") if self.set.Funny == True: self.func.Funny() else: print("mei") if self.set.Honyaradoh == True: self.func.Honyaradoh() else: print("mei") if self.set.Private_talk == True: self.func.Private_talk() else: print("mei") if self.set.Eight_trigrams == True: self.func.Eight_trigrams() else: print("mei") if self.set.Technocrats == True: self.func.Technocrats() else: print("mei") if self.set.Financial_fans == True: self.func.Financial_fans() else: print("mei") if self.set.Car_control == True: self.func.Car_control() else: print("mei") if self.set.Life_home == True: self.func.Life_home() else: print("mei") if self.set.Fashion_circle == True: self.func.Fashion_circle() else: print("mei") if self.set.Parenting == True: self.func.Parenting() else: print("mei") if self.set.Travel == True: self.func.Travel() else: print("mei") if self.set.Workplace == True: self.func.Workplace() else: print("mei") if self.set.delicious_food == True: self.func.delicious_food() else: print("mei") if self.set.history == True: self.func.history() else: print("mei") if self.set.education == True: self.func.education() else: print("mei") if self.set.constellation == True: self.func.constellation() else: print("mei") if self.set.Sports == True: self.func.Sports() else: print("mei") if self.set.military == True: self.func.military() else: print("mei") if self.set.game == True: self.func.game() else: print("mei") if self.set.cute_pet == True: self.func.cute_pet() else: print("mei") if self.set.Home_page_hot == True: self.func.Home_page_hot() else: print("mei")
7、配置文件模块:settings.py,根据用户选择一些爬取方向,设置为True或者False,来判定相应的API函数是否执行,代码如下:
''' *****配置文件***** 作用:便于用户设置输入,程序运行时从此文件读取参数属性,以便程序 明确执行 “动作” ''' set_dir={ # 热点事件爬取(为以下三类) # 微信公众号热点来源(子集有三类) # 搜索热词 'Search_hot_words': True, # 编辑精选 'Editor_s_selection': True, # 热点文章 'Hot_articles': True, # 自定义热点来源 # 文本的形式--列表 'Custom_hotspot_source_Text': True, # 若 Custom_hotspot_source_List:True 请在此处列表添加热点来源---文本 'text_list': [], # 链接的形式--列表 'Custom_hotspot_source_Url': True, # 若 Custom_hotspot_source_Url:True 请在此处列表添加热点来源---链接 'Custom_hotspot_source_Url_List': '[]', # 其他网站的热点事件来源 # 提取文本--列表 'Hot_event_sources_of_other_websites': False, # 若 Hot_event_sources_of_other_websites:True 请在此处列表添加热点事件来源---文本 'Hot_event_sources_of_other_websites_List': [], # 输入关键字文本爬取 #在此处输入文本 'text':[], 'input_text':False, # 爬取文章 'Crawling_article':False, # 爬取公众号 'Crawling_official_account':False, # 微信公众号分类爬取--加载到更多 # 热门 'Hot':True, # 搞笑 'Funny':True, # 养生堂 'Honyaradoh':False, # 私房话 'Private_talk':False, # 八卦精 'Eight_trigrams':False, # 科技咖 'Technocrats':False, # 财经迷 'Financial_fans':False, # 汽车控 'Car_control':False, # 生活家 'Life_home':False, # 时尚圈 'Fashion_circle':False, # 育儿 'Parenting':False, # 旅游 'Travel':False, # 职场 'Workplace': False, # 美食 'delicious_food': False, # 历史 'history': False, # 教育 'education': False, # 星座 'constellation': False, # 体育 'Sports': False, # 军事 'military': False, # 游戏 'game': False, # 萌宠 'cute_pet': False, # 主页热推 'Home_page_hot': False, } class configure(): def __init__(self): self.Search_hot_words = set_dir.get('Search_hot_words') self.Editor_s_selection = set_dir.get('Editor_s_selection') self.Hot_articles = set_dir.get('Hot_articles') self.Custom_hotspot_source_Text = set_dir.get('Custom_hotspot_source_Text') self.text_list = set_dir.get('text_list') self.Custom_hotspot_source_Url = set_dir.get('Custom_hotspot_source_Url') self.Custom_hotspot_source_Url_List = set_dir.get('Custom_hotspot_source_Url_List') self.Hot_event_sources_of_other_websites = set_dir.get('Hot_event_sources_of_other_websites') self.input_text = set_dir.get('input_text') self.Crawling_article = set_dir.get('Crawling_article') self.Crawling_official_account = set_dir.get('Crawling_official_account') self.Hot = set_dir.get('Hot') self.Funny = set_dir.get('Funny') self.Honyaradoh = set_dir.get('Honyaradoh') self.Private_talk = set_dir.get('Private_talk') self.Eight_trigrams = set_dir.get('Eight_trigrams') self.Technocrats = set_dir.get('Technocrats') self.Financial_fans = set_dir.get('Financial_fans') self.Car_control = set_dir.get('Car_control') self.Life_home = set_dir.get('Life_home') self.Fashion_circle = set_dir.get('Fashion_circle') self.Parenting = set_dir.get('Parenting') self.Travel = set_dir.get('Travel') self.Workplace = set_dir.get('Workplace') self.delicious_food = set_dir.get('delicious_food') self.history = set_dir.get('history') self.education = set_dir.get('education') self.constellation = set_dir.get('constellation') self.Sports = set_dir.get('Sports') self.military = set_dir.get('military') self.game = set_dir.get('game') self.cute_pet = set_dir.get('cute_pet') self.Home_page_hot = set_dir.get('Home_page_hot')
8、API文件模块:API_Function.py 定义了一些配置文件中对应的处理函数
API_Function.py:
# 导包 import requests from bs4 import BeautifulSoup import pymysql # 后面发起请求会用到的请求头和请求url url = 'https://weixin.sogou.com/' # 搜狗微信的入口url链接 headers = {'authority': 'weixin.sogou.com', 'method': 'GET', 'path': '/', 'scheme': 'https', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'accept-encoding': 'gzip, deflate, br', 'accept-language': 'zh-CN,zh;q=0.9', 'cache-control': 'max-age=0', 'cookie': 'ABTEST=7|1619743404|v1; IPLOC=CN3205; SUID=5E9A50754018960A00000000608B52AC; SUID=5E9A5075AF21B00A00000000608B52AC; weixinIndexVisited=1; SUV=0087AD4475509A5E608B52ADF7A81201; SNUID=CD8A73542124E19CE9C1C01321869CD8; JSESSIONID=aaaUz7AAKRNv6gHXIfJGx', 'sec-fetch-dest': 'document', 'sec-fetch-mode': 'navigate', 'sec-fetch-site': 'none', 'sec-fetch-user': '?1', 'upgrade-insecure-requests': '1', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36 SLBrowser/7.0.0.4071 SLBChan/15'} class Functions(): def __init__(self, page, proxies): # 代理 self.proxies = proxies # 搜狗微信主页的网页源码 self.page = page # bs4初始化网页源码 self.soup = BeautifulSoup(self.page, 'lxml') # 连接mysql数据库,返回一个数据库对象 self.coon = pymysql.connect(user='root', password='clly0528', db='gc') # 创建游标对象,方便后续执行sql语句操作 self.cur = self.coon.cursor() # 删除已存在的同名的数据表 self.clear_database() def clear_database(self): self.cur.execute("show tables like 'urls'") if len(self.cur.fetchall()) == 0: print("创建表") # 创建表 self.cur.execute('''create table urls(id int not NULL auto_increment, href varchar(500) not NULL, title varchar(500) not NULL, primary key (id) );''') else: # 删除表 self.cur.execute("drop table urls") self.coon.commit() # 创建表 print(30 * '-' + "删除原表,创建新表!" + 30 * '-') self.cur.execute('''create table urls(id int auto_increment primary key, href varchar(500) not NULL, title varchar(500) not NULL );''') #提交保存 self.coon.commit() # *********************************************************************************************************************** def Search_hot_words(self): # 搜索热词 for tag in self.soup.select('#topwords > li > a'): # 罗列出每个tga标签对象的href属性与title属性 sql = r"insert into urls(href,title) value(" + "'" + tag.attrs['href'] + "'" + ',' + "'" + tag.attrs[ 'title'] + "'" + ")" print("搜索热词sql: ", sql) self.cur.execute(sql) self.coon.commit() def Editor_s_selection(self): # 编辑精选 p_soup_list = self.soup.select(".aside >p") for p_soup in p_soup_list: if p_soup.get_text() == "编辑精选": a_soup = p_soup.find_next("ul").select('li >.txt-box > .p1 >a') for tag in a_soup: sql = r"insert into urls(href,title) value(" + "'" + tag.attrs['href'] + "'" + ',' + "'" + \ tag.attrs[ 'title'] + "'" + ")" print("编辑精选sql: ", sql) self.cur.execute(sql) self.coon.commit() def Hot_articles(self): # 热点文章 p_soup_list = self.soup.select(".aside >p") for p_soup in p_soup_list: if p_soup.get_text() == "热点文章": a_soup = p_soup.find_next("ul").select('li >.txt-box > .p1 >a') for tag in a_soup: sql = r"insert into urls(href,title) value(" + "'" + tag.attrs['href'] + "'" + ',' + "'" + \ tag.attrs[ 'title'] + "'" + ")" print("热点文章sql: ", sql) self.cur.execute(sql) self.coon.commit() def Custom_hotspot_source_Text(self, text_list): # 自定义热点文本 a = "https://weixin.sogou.com/weixin?type=2&s_from=input&query=" for text in text_list: response = requests.get(a + text, headers=headers, proxies=self.proxies) print(response.text) def Custom_hotspot_source_Url(self, url_list): # 热点来源---链接 pass def Hot_event_sources_of_other_websites(self, list): # 选定某个网站作为外来热点来源-----写个爬虫去定向的爬取该网站的热点作为文本输入 pass def input_text(self, text_list): # 搜文章 a = "https://weixin.sogou.com/weixin?type=2&s_from=input&query=" for text in text_list: response = requests.get(a + text, headers=headers, proxies=self.proxies) print(response.text) def Crawling_article(self, text_list): # 搜公众号 a = "https://weixin.sogou.com/weixin?type=1&s_from=input&query=" for text in text_list: response = requests.get(a + text, headers=headers, proxies=self.proxies) print(response.text) def Hot(self, num=20, pd=0): # 分类--热门--提取相关文章链接 flag=False, 是否加载更多,默认加载20条,num=20,自定义加载多少条 pn = 0 if num > 20: pn = num / 20 for n in range(pn): self.pc(n) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pn, pd) # ----------------------------- 开始 pc()函数为分类爬取中所需的常用函数,所以单独提出来。-------------------------------------- def pc(self, pn, pd): if pn >= 1: url = "https://weixin.sogou.com/pcindex/pc/pc_" + str(pd) + "/" + str(pn) + ".html" else: url = "https://weixin.sogou.com/pcindex/pc/pc_" + str(pd) + "/pc_" + str(pd) + ".html" print("url:", url) a = "li > .txt-box > h3 > a " res = requests.get(url, headers=headers, proxies=self.proxies).content.decode("utf8") self.soup = BeautifulSoup(res, 'lxml') a_soup = self.soup.select(a) print(a_soup) for tag in a_soup: sql = r"insert into urls(href,title) value(" + "'" + tag.attrs['href'] + "'" + ',' + "'" + \ tag.text + "'" + ")" print("分类--热门sql: ", sql) self.cur.execute(sql) self.coon.commit() # ---------------------------- 结束 pc() ---------------------------------------------------------------------------- def Funny(self, num=20, pd=1): print("Funny") pn = 0 if num > 20: pn = num / 20 for n in range(pn): self.pc(n, pd) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pn, pd) print("可以") def Honyaradoh(self, num=20, pd=2): pn = 0 if num > 20: pn = num / 20 for n in range(pn, pd): self.pc(n, pd) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pn, pd) def Private_talk(self, num=20, pd=3): pn = 0 if num > 20: pn = num / 20 for n in range(pn): self.pc(n, pd) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pn, pd) def Eight_trigrams(self, num=20, pd=4): pn = 0 if num > 20: pn = num / 20 for n in range(pn): self.pc(n, pd) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pn, pd) def Technocrats(self, num=20, pd=5): pn = 0 if num > 20: pn = num / 20 for n in range(pn): self.pc(n, pd) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pn, pd) def Financial_fans(self, num=20, pd=6): pn = 0 if num > 20: pn = num / 20 for n in range(pn): self.pc(n, pd) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pn, pd) def Car_control(self, num=20, pd=7): pn = 0 if num > 20: pn = num / 20 for n in range(pn): self.pc(n, pd) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pn, pd) def Life_home(self, num=20, pd=8): pn = 0 if num > 20: pn = num / 20 for n in range(pn): self.pc(n, pd) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pn, pd) def Fashion_circle(self, num=20, pd=9): pn = 0 if num > 20: pn = num / 20 for n in range(pn): self.pc(n, pd) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pn, pd) def Parenting(self, num=20, pd=10): pn = 0 if num > 20: pn = num / 20 for n in range(pn): self.pc(n, pd) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pn, pd) def Travel(self, num=20, pd=11): pn = 0 if num > 20: pn = num / 20 for n in range(pn): self.pc(n, pd) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pn, pd) def Workplace(self, num=20, pd=12): pn = 0 if num > 20: pn = num / 20 for n in range(pn): self.pc(n, pd) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pn, pd) def delicious_food(self, num=20, pd=13): pn = 0 if num > 20: pn = num / 20 for n in range(pn): self.pc(n, pd) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pn, pd) def history(self, num=20, pd=14): pn = 0 if num > 20: pn = num / 20 for n in range(pn): self.pc(n, pd) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pn, pd) def education(self, num=20, pd=15): pn = 0 if num > 20: pn = num / 20 for n in range(pn): self.pc(n, pd) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pn, pd) def constellation(self, num=20, pd=16): pn = 0 if num > 20: pn = num / 20 for n in range(pn): self.pc(n, pd) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pn, pd) def Sports(self, num=20, pd=17): pn = 0 if num > 20: pn = num / 20 for n in range(pn): self.pc(n, pd) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pn, pd) def military(self, num=20, pd=18): pn = 0 if num > 20: pn = num / 20 for n in range(pn): self.pc(n, pd) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pn, pd) def game(self, num=20, pd=19): pn = 0 if num > 20: pn = num / 20 for n in range(pn): self.pc(n, pd) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pn, pd) def cute_pet(self, num=20, pd=20): pn = 0 if num > 20: pn = num / 20 for n in range(pn): self.pc(n, pd) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pn, pd) def Home_page_hot(self, num=20, pd=21): pn = 0 if num > 20: pn = num / 20 for n in range(pn): self.pc(n, pd) print("再一次执行这个函数") return "函数已执行完!" else: self.pc(pd)
线程池文件:thread_pool.py
#导包 from multiprocessing import cpu_count import threading import requests import queue import pymysql import time ''' pymysql创建一个coon数据库对象,将数据库中的重复链接去重(变唯一),把所有的链接拿出来放到队列中去(需要定义队列对象--初始化队列对象) 从队列中拿出数据分给线程去处理,注意:拿出数据的这个动作它是唯一的(就是只能每次一个线程去拿----加锁--锁定只能一个线程拿---线程锁) 编写线程函数--每个线程函数内容皆一样(都是去下载html页面,用到的是requests库),将数据放到数据库中-建表--方便后面数据清洗 ''' class Open_Thread_Assign_Links(): # 开启线程,分配链接 def __init__(self,proxies): # 请求头 self.headers = {'authority': 'weixin.sogou.com', 'method': 'GET', 'path': '/', 'scheme': 'https', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'accept-encoding': 'gzip, deflate, br', 'accept-language': 'zh-CN,zh;q=0.9', 'cache-control': 'max-age=0', 'cookie': 'ABTEST=7|1619743404|v1; IPLOC=CN3205; SUID=5E9A50754018960A00000000608B52AC; SUID=5E9A5075AF21B00A00000000608B52AC; weixinIndexVisited=1; SUV=0087AD4475509A5E608B52ADF7A81201; SNUID=CD8A73542124E19CE9C1C01321869CD8; JSESSIONID=aaaUz7AAKRNv6gHXIfJGx', 'sec-fetch-dest': 'document', 'sec-fetch-mode': 'navigate', 'sec-fetch-site': 'none', 'sec-fetch-user': '?1', 'upgrade-insecure-requests': '1', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36 SLBrowser/7.0.0.4071 SLBChan/15'} #创建锁对象 self.lock = threading.Lock() #确定线程数 self.Mul_Num() # 代理 self.proxies=proxies #创建队列 self.q = queue.Queue(500) # 从数据库中取出url数据 self.url_tuple = self.get() # 将链接压进队列 self.Push_Element_Queue(self.url_tuple) # 启动线程处理队列 self.Run_thread() def get(self): # 从数据库中取出url数据 #便于初始化类时从数据库中拿出数据,也方便后续界面用户发起的请求 coon = pymysql.connect(user='root', password='clly0528', db='gc') cur = coon.cursor() cur.execute('select * from urls') urls_list = cur.fetchall() # print("*:",urls_list) return urls_list def Push_Element_Queue(self, urls_list): for tuple_x in urls_list: self.q.put(tuple_x[1]) #将链接压进队列 # print("q:",self.q) def Run_thread(self): cut=0 for thread_id in range(self.mul_num): cut+=1 threading.Thread(target=self.spider,args=(self.q,cut)).start() print("已成功开启%d个线程" % self.mul_num) # 线程函数 def spider(self,lq,cut): print("开启线程 %d ...." % cut) while True: while lq.empty: # 加锁 self.lock.acquire() # 保证这个动作单一,同一时间只能由一个线程主管 url = lq.get(block=True, timeout=None) # 解锁 self.lock.release() # 这里获取到返回体,将返回体通过yield关键字传递给数据处理类。。 print("下载文章...") responese=requests.get(url=url,headers=self.headers,proxies=self.proxies) # 文章内容写入磁盘 Generate_file(responese) # 确定开启几个线程 def Mul_Num(self): # x: cpu实际运行时间 y: 线程等待时间 N = cpu_count() x = 1 y = 5 self.mul_num = int(N * (x + y) / x) # 将下载后的文章写入磁盘html_page中 def Generate_file(responese): print("文章写入文件夹") with open(r'C:\Users\Windows_pycharm\Desktop\G\图书管理系统\微信公众号爬取\html_page'+'\\'+time.strftime('%Y%m%d%H%M%S') + '.txt','w',encoding='utf-8') as f: f.write(responese.text)