pyuic5 download.ui -o download.py
pyrcc5 -o logo.py logo.qrc
# request_headers.py import aiohttp import asyncio from fake_useragent import UserAgent import pickle import time def get_agents(): agent_list = [] n = 0 while n <= 10000: agent = UserAgent().random agent_list.append(agent) n += 1 agent_list = list(set(agent_list)) print(len(agent_list)) return agent_list async def send_request(useragent, url): """请求数据""" async with aiohttp.ClientSession() as session: try: headers = { 'user-agent': useragent } print('正在测试: ', useragent) async with session.get(url=url, headers=headers, timeout=15, verify_ssl=False) as response: print(response.status) if response.status == 200: print('User-agent可用: ', useragent) agent_ok.append(useragent) await asyncio.sleep(1) else: print('请求响应码不合法:', useragent) except: print('请求失败', useragent) async def main(): tasks = [send_request(agent, url) for agent in agents] await asyncio.wait(tasks) if __name__ == '__main__': url = 'https://www.lofter.com/tag/TAG/new?page=1' agents = get_agents() agent_ok = [] print('开始测试: ') try: loop = asyncio.get_event_loop() loop.run_until_complete(main()) except Exception as err: print('发生错误:', err.args) print(agent_ok) with open('./useragent/agent.pickle', 'wb') as f: pickle.dump(agent_ok, f)
from sys import exit from random import choice import logo if __name__ == '__main__': agent_list = ['USERAGENT'] # 你的 User-Agent useragent = choice(agent_list) headers = { 'User-Agent': useragent } proxies = { 'http': 'http://XXX.XXX.XXX.X:XXXX' } # 你的 ip 池 app = QApplication([]) download = Download() download.show() exit(app.exec_())
from time import localtime, strftime def timestamp13_to_date(target_timestamp, date_format='%y%m%d-%H%M'): """ 将毫秒时间戳转换为时间字符串。 :param target_timestamp: 毫秒时间戳(13位数字) :param date_format: 时间字符串格式 :return: 时间字符串 """ timestamp13 = localtime(float(int(target_timestamp) / 1000)) standard_date = strftime(date_format, timestamp13) return standard_date
from PyQt5.QtCore import * from PyQt5.QtGui import * from PyQt5.QtWidgets import * class MySignals(QObject): text_print = pyqtSignal(str) class Download(QMainWindow): def __init__(self): super().__init__() # 使用ui文件导入定义界面类 self.ui = Ui_MainWindow() # 初始化界面 self.ui.setupUi(self) self.ui.pushButton.clicked.connect(self.handleCalc) # MySignals()类的实例对象 self.ms = MySignals() # 自定义信号的处理函数 self.ms.text_print.connect(self.printToGui) self.ui.pushButton_2.clicked.connect(self.handleCalc_2) self.ms_2 = MySignals() self.ms_2.text_print.connect(self.printToGui_2) def printToGui(self, text): self.ui.textBrowser.append(text) self.ui.textBrowser.ensureCursorVisible() def printToGui_2(self, text): self.ui.textBrowser_2.append(text) self.ui.textBrowser_2.ensureCursorVisible()
from threading import Thread from lxml.etree import HTML from time import sleep from os.path import isfile from os import getcwd from requests import get from selectolax.parser import HTMLParser from re import sub from concurrent.futures import ThreadPoolExecutor from random import uniform
def handleCalc(self): def run(): tag = self.ui.plainTextEdit.toPlainText() # tag 名称 end_date = self.ui.dateEdit.date().toString('yyMMdd') # 截止年月日 min_hot = int(self.ui.plainTextEdit_2.toPlainText()) # 筛选热度 shield_tag = self.ui.plainTextEdit_3.toPlainText().split(",") # 屏蔽词,以逗号隔开 self.ms.text_print.emit('正在运行,请稍等') def get_pic(page): # while loop 是为了避免特殊原因爬不到的情况 n = 0 while n == 0: try: r_text = get(url=f'https://www.lofter.com/tag/{tag}/new?page={page}', headers=headers, proxies=proxies).text break except: sleep(10) pass sleep(uniform(0, 0.1)) page_html = HTML(r_text) div_list = page_html.xpath('//*[@id="main"]/div[@data-blogid]') for p in div_list: # 是否是图片产出 if p.xpath('./div[2]/div/div[2]/div[2]/div/div/div[@class="img"]'): timestamp = p.xpath('./div[2]/div/div[1]/a/@data-time')[0] date = timestamp13_to_date(timestamp) # 是否满足截止日期 if date[:6] >= end_date: try: hot = p.xpath('./div[2]/div/div[2]/div[3]/div[2]/span[1]/a/text()')[0][3:-1] if not hot: hot = '0' except IndexError: hot = '0' # 是否满足热度 if int(hot) >= min_hot: pic_tag = p.xpath('./div[2]/div/div[2]/div[3]/div[1]/span/a/span/text()') # 是否包含屏蔽词 if [i for i in shield_tag if i in pic_tag]: pass else: url = p.xpath('./div[2]/div/div[1]/a/@href')[0] while n == 0: try: r1 = get(url=url, headers=headers, proxies=proxies) break except: sleep(10) pass html = HTML(r1.text) pic = html.xpath('//*[@imggroup="gal"]/img/@src') name = sub(r"[\/\\\:\*\?\"\<\>\|\\\n]", "-", p.xpath('./@data-blognickname')[0]) for k in range(len(pic)): title = f'{date}_{name}_({str(k+1)}).jpg' upath = f'{getcwd()}/{title}' if isfile(upath) is True: self.ms.text_print.emit('已存在:' + title) else: while n == 0: try: image = get(pic[k].split("?")[0]).content break except: sleep(10) pass with open(upath, 'wb') as f: f.write(image) self.ms.text_print.emit(title) else: pass else: return True else: pass # 如果没有下一页按钮,说明最后一页已经爬取完毕 if not HTMLParser(r_text).css_first('span.w-iar2\000r'): return True try: with ThreadPoolExecutor() as pool: for page in range(1, 100000000): future = pool.submit(get_pic, page) if future.result(): break self.ms.text_print.emit('END') # 关闭窗口异常 except RuntimeError: pass t = Thread(target=run) t.setDaemon(True) t.start()
def handleCalc_2(self): def run_2(): tag = self.ui.plainTextEdit_4.toPlainText() end_date = self.ui.dateEdit_2.date().toString('yyMMdd') min_hot = int(self.ui.plainTextEdit_5.toPlainText()) shield_tag = self.ui.plainTextEdit_6.toPlainText().split(",") self.ms_2.text_print.emit('正在运行,请稍等') def get_art(page): n = 0 while n == 0: try: r2_text = get(url=f'https://www.lofter.com/tag/{tag}/new?page={page}', headers=headers, proxies=proxies).text break except: sleep(10) pass sleep(uniform(0, 0.1)) page_html = HTML(r2_text) div_list = page_html.xpath('//*[@id="main"]/div[@data-blogid]') for p in div_list: # 是否是文章产出 if p.xpath('./div[2]/div/div[2]/div[2]/div/div[1]/div[@class="txt js-digest ptag"]'): timestamp = p.xpath('./div[2]/div/div[1]/a/@data-time')[0] date = timestamp13_to_date(timestamp) # 是否满足截止日期 if date[:6] >= end_date: try: hot = p.xpath('./div[2]/div/div[2]/div[3]/div[2]/span[1]/a/text()')[0][3:-1] if not hot: hot = '0' except IndexError: hot = '0' # 是否满足热度 if int(hot) >= min_hot: art_tag = p.xpath('./div[2]/div/div[2]/div[3]/div[1]/span/a/span/text()') # 是否包含屏蔽词 if [i for i in shield_tag if i in art_tag]: pass else: # 获取文章标题 try: tit = p.xpath('./div[2]/div/div[2]/div[2]/div/h2/text()')[0] except IndexError: tit = '无题' name = p.xpath('./@data-blognickname')[0] title = sub(r"[\/\\\:\*\?\"\<\>\|\\\n]", "-", f'{date}_{name}_{tit}.txt') upath = f'{getcwd()}/{title}' if isfile(upath) is True: self.ms_2.text_print.emit('已存在:' + title) else: url = p.xpath('./div[2]/div/div[1]/a/@href')[0] while n == 0: try: r3 = get(url=url, headers=headers, proxies=proxies) break except: sleep(10) pass # 正文有可能出现的标签 parser = HTMLParser(r3.text.replace('<br />', '\n')) res_1 = parser.css('div[class=content]') res_2 = parser.css('div[class=txtcont]') res_3 = parser.css('div[class=contt]') res_4 = parser.css('div[class=cnt\000box]') res_5 = parser.css('div[class=detail-ct]') res_6 = parser.css('div[class=post-ct]') res_7 = parser.css('div[class=listitm\000regular]') res_8 = parser.css('div[class=ctc\000box]') res_9 = parser.css('div[class=icontent]') res_10 = parser.css('div[class=textc]') res_11 = parser.css('div[class=postdesc]') res_12 = parser.css('div[class=cnttxt]') res_13 = parser.css('div[class=posttext]') res_14 = parser.css('div[class=text]') res_15 = parser.css('div[class=cont]') res = res_1 if res_1 else res_2 if res_2 else res_3 if res_3 else res_4 if res_4 else res_5 if res_5 else res_6 if res_6 else res_7 if res_7 else res_8 if res_8 else res_9 if res_9 else res_10 if res_10 else res_11 if res_11 else res_12 if res_12 else res_13 if res_13 else res_14 if res_14 else res_15 if res_15 else [] content = '' with open(upath, 'a+', encoding='utf-8') as f: f.write(f'热度:{hot} tag:{art_tag} 日期:{date[:6]} 原文链接:{url}\n'.replace(r'\xa0', ' ')) # 原生字符 \xa0 转换为 空格 for j in res: for node in HTMLParser(j.html).css('p'): content += node.text(deep=True, separator='', strip=False) + '\n' # 没有'p'标签的情况 if not content: for node in res: content += node.text(deep=True, separator='', strip=False) + '\n' f.write(content.replace('\n\n\n', '\n\n').replace('\n\n\n\n', '\n\n\n')) self.ms_2.text_print.emit(title) else: pass else: return True else: pass if not HTMLParser(r2_text).css_first('span.w-iar2\000r'): return True try: with ThreadPoolExecutor() as pool: for page in range(1, 100000000): future = pool.submit(get_art, page) if future.result(): break self.ms_2.text_print.emit('END') except RuntimeError: pass t_2 = Thread(target=run_2) t_2.setDaemon(True) t_2.start()
from pandas import DataFrame, concat
def handleCalc_3(self): def run_3(): top_cp = self.ui.plainTextEdit_7.toPlainText().split(",") shield_tag = self.ui.plainTextEdit_8.toPlainText().split(",") start = self.ui.dateEdit_3.date().toString('yyMM') end = self.ui.dateEdit_4.date().toString('yyMM') def total_tag(cp): n = 0 while n == 0: try: r4 = get(url=f'https://www.lofter.com/tag/{cp}', headers=headers, proxies=proxies) break except: sleep(10) pass html = HTML(r4.text) total = html.xpath('//*[@id="tagpageheader"]/div/div/div[2]/div[1]/div[2]/div[1]/div[2]/text()')[0].split('浏览')[1].split('参与')[0].strip() self.ms_3.text_print.emit('tag总数:' + total) return total def single_cp(cp): hots = [[] for i in res] new = [0 for i in res] n = 0 for page in range(1, 100000): self.ms_3.text_print.emit('page:' + str(page)) while n == 0: try: r5_text = get(url=f'https://www.lofter.com/tag/{cp}/new?page={page}', headers=headers, proxies=proxies).text break except: sleep(10) pass sleep(0.5) page_html = HTML(r5_text) div_list = page_html.xpath('//*[@id="main"]/div[@data-blogid]') for p in div_list: tag = p.xpath('./div[2]/div/div[2]/div[3]/div[1]/span/a/span/text()') if [i for i in shield_tag if i in tag]: pass else: timestamp = p.xpath('./div[2]/div/div[1]/a/@data-time')[0] date = timestamp13_to_date(timestamp) for j in res: if date[:4] > all_month[0]: break if date[:4] == all_month[j]: self.ms_3.text_print.emit('date:' + date[:4]) try: hot = p.xpath('./div[2]/div/div[2]/div[3]/div[2]/span[1]/a/text()')[0][3:-1] if not hot: hot = '0' except IndexError: hot = '0' hots[j].append(int(hot)) new[j] += 1 break if date[:-4] < all_month[-1]: page = -1 break else: pass if page == -1: break if page == -1: break if not HTMLParser(r5_text).css_first('span.w-iar2\000r'): break max_hot = [] for i in res: if not hots[i] and not new[i]: max_hot.append(0) else: max_hot.append(max(hots[i])) self.ms_3.text_print.emit('时间:' + all_month[i]) self.ms_3.text_print.emit('月度新增:' + str(new[i])) self.ms_3.text_print.emit('最高热度:' + str(max_hot[i])) return new, max_hot def download_history(): total_list = {} dfs = [] data = {} for cp in top_cp: self.ms_3.text_print.emit('统计中,请稍等:' + cp) total_list[cp] = total_tag(cp) new, max_hot = single_cp(cp=cp) data[cp] = [[cp, total_list[cp], new[i], max_hot[i]] for i in res] for i in res: df = DataFrame([list(data.values())[j][i] for j in range(len(data))], columns=['tag名称', 'tag总数', '月度新增', '最高热度']) df.set_index(['tag名称', 'tag总数'], inplace=True) dfs.append(df) frame = concat(dfs, keys=all_month, axis=1) frame.columns.names = ['时间', '数据'] self.ms_3.text_print.emit('result:') self.ms_3.text_print.emit(str(frame)) frame.to_excel('新建 Microsoft Excel 工作表.xlsx') self.ms_3.text_print.emit('已保存至:新建 Microsoft Excel 工作表.xlsx') if int(start) > int(end): self.ms_3.text_print.emit('请重新输入时间') else: year = list(range(int(end[:2]), int(start[:2]) - 1, -1)) month = ['12', '11', '10', '09', '08', '07', '06', '05', '04', '03', '02', '01'] all_month = [str(i) + j for i in year for j in month] all_month = [i for i in all_month if int(start) <= int(i) <= int(end)] res = range(len(all_month)) download_history() self.ms_3.text_print.emit('END') t_3 = Thread(target=run_3) t_3.setDaemon(True) t_3.start()