Java教程

Lofter存档助手

本文主要是介绍Lofter存档助手,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

声明:该代码仅为个人业余练习产物,供爱好者保存tag下图文到本地作为收藏,不得用于商业用途、侵犯他人著作权。

效果演示见https://www.bilibili.com/video/BV18h411v7kN拖到最后五分钟

一、准备工作 

本工具使用PyQt5开发图形界面,这一步不具体展开记录。之后导出ui文件,然后转换为py文件。在命令行输入

pyuic5 download.ui -o download.py

即可完成转换。

图形界面中的windowIcon要单独导出为qrc文件,然后转换为py文件。在命令行输入

pyrcc5 -o logo.py logo.qrc

 即可完成转换。

如果频繁访问,会被网易的反爬虫机制制裁,需要事先设定多个User-Agent和ip池来模拟不同用户浏览器(做了这步还是可能会被封ip,我也不知道为什么)。用fake_useragent库异步测试哪些User-Agent可以用来爬Lofter。

# request_headers.py
import aiohttp
import asyncio
from fake_useragent import UserAgent
import pickle
import time

def get_agents():
    agent_list = []
    n = 0
    while n <= 10000:
        agent = UserAgent().random
        agent_list.append(agent)
        n += 1
    agent_list = list(set(agent_list))
    print(len(agent_list))
    return agent_list

async def send_request(useragent, url):
    """请求数据"""
    async with aiohttp.ClientSession() as session:
        try:
            headers = {
                'user-agent': useragent
            }
            print('正在测试: ', useragent)
            async with session.get(url=url, headers=headers, timeout=15, verify_ssl=False) as response:
                print(response.status)
                if response.status == 200:
                    print('User-agent可用: ', useragent)
                    agent_ok.append(useragent)
                    await asyncio.sleep(1)
                else:
                    print('请求响应码不合法:', useragent)
        except:
            print('请求失败', useragent)

async def main():
    tasks = [send_request(agent, url) for agent in agents]
    await asyncio.wait(tasks)

if __name__ == '__main__':
    url = 'https://www.lofter.com/tag/TAG/new?page=1'
    agents = get_agents()
    agent_ok = []
    print('开始测试: ')
    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
    except Exception as err:
        print('发生错误:', err.args)

    print(agent_ok)
    with open('./useragent/agent.pickle', 'wb') as f:
        pickle.dump(agent_ok, f)

ip池的获取与之类似。这里我是参考一位大佬的方法。

回到download.py,主函数:

from sys import exit
from random import choice
import logo

if __name__ == '__main__':
    agent_list = ['USERAGENT']    # 你的 User-Agent
    useragent = choice(agent_list)
    headers = {
        'User-Agent': useragent
    }
    proxies = {
        'http': 'http://XXX.XXX.XXX.X:XXXX'
    }    # 你的 ip 池

    app = QApplication([])
    download = Download()
    download.show()
    exit(app.exec_())

 二、现在考虑这个工具需要具备的功能。 

以前用过一位大佬制作的存文工具,其文件命名为“作者名+标题”,实际上会漏掉一些标题相同(或没有标题)的文章。发布时间是以时间戳的形式呈现的,命名规则中加上时间能尽可能地减少遗漏。

from time import localtime, strftime

def timestamp13_to_date(target_timestamp, date_format='%y%m%d-%H%M'):
    """
    将毫秒时间戳转换为时间字符串。
    :param target_timestamp: 毫秒时间戳(13位数字)
    :param date_format: 时间字符串格式
    :return: 时间字符串
    """
    timestamp13 = localtime(float(int(target_timestamp) / 1000))
    standard_date = strftime(date_format, timestamp13)
    return standard_date

在爬取过程中,需要将日志以字符串形式显示在QTextBrowser中。自定义一个信号类。

自定义一个Download类。将两个pushButton绑到两个函数上,点击pushButton就能开始执行下载图、文的任务。

from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *

class MySignals(QObject):
    text_print = pyqtSignal(str)

class Download(QMainWindow):

    def __init__(self):
        super().__init__()
        # 使用ui文件导入定义界面类
        self.ui = Ui_MainWindow()
        # 初始化界面
        self.ui.setupUi(self)
        self.ui.pushButton.clicked.connect(self.handleCalc)
        # MySignals()类的实例对象
        self.ms = MySignals()
        # 自定义信号的处理函数
        self.ms.text_print.connect(self.printToGui)

        self.ui.pushButton_2.clicked.connect(self.handleCalc_2)
        self.ms_2 = MySignals()
        self.ms_2.text_print.connect(self.printToGui_2)

    def printToGui(self, text):
        self.ui.textBrowser.append(text)
        self.ui.textBrowser.ensureCursorVisible()

    def printToGui_2(self, text):
        self.ui.textBrowser_2.append(text)
        self.ui.textBrowser_2.ensureCursorVisible()

 采用Requests库请求数据,xpath和selectolax定位。selectolax定位速度快,可以做到正文内容、排版跟原来一样。Selectolax库的Github地址

爬虫是I/O 密集型任务。线程池官方文档

from threading import Thread
from lxml.etree import HTML
from time import sleep
from os.path import isfile
from os import getcwd
from requests import get
from selectolax.parser import HTMLParser
from re import sub
from concurrent.futures import ThreadPoolExecutor
from random import uniform

图:

def handleCalc(self):
    def run():
        tag = self.ui.plainTextEdit.toPlainText()    # tag 名称
        end_date = self.ui.dateEdit.date().toString('yyMMdd')    # 截止年月日
        min_hot = int(self.ui.plainTextEdit_2.toPlainText())    # 筛选热度
        shield_tag = self.ui.plainTextEdit_3.toPlainText().split(",")    # 屏蔽词,以逗号隔开
        self.ms.text_print.emit('正在运行,请稍等')

        def get_pic(page):
            # while loop 是为了避免特殊原因爬不到的情况
            n = 0
            while n == 0:
                try:
                    r_text = get(url=f'https://www.lofter.com/tag/{tag}/new?page={page}', headers=headers, proxies=proxies).text
                    break
                except:
                    sleep(10)
                    pass
            sleep(uniform(0, 0.1))
            page_html = HTML(r_text)
            div_list = page_html.xpath('//*[@id="main"]/div[@data-blogid]')
            for p in div_list:
                # 是否是图片产出
                if p.xpath('./div[2]/div/div[2]/div[2]/div/div/div[@class="img"]'):
                    timestamp = p.xpath('./div[2]/div/div[1]/a/@data-time')[0]
                    date = timestamp13_to_date(timestamp)
                    # 是否满足截止日期
                    if date[:6] >= end_date:
                        try:
                            hot = p.xpath('./div[2]/div/div[2]/div[3]/div[2]/span[1]/a/text()')[0][3:-1]
                            if not hot:
                                hot = '0'
                        except IndexError:
                            hot = '0'
                        # 是否满足热度
                        if int(hot) >= min_hot:
                            pic_tag = p.xpath('./div[2]/div/div[2]/div[3]/div[1]/span/a/span/text()')
                            # 是否包含屏蔽词
                            if [i for i in shield_tag if i in pic_tag]:
                                pass
                            else:
                                url = p.xpath('./div[2]/div/div[1]/a/@href')[0]
                                while n == 0:
                                    try:
                                        r1 = get(url=url, headers=headers, proxies=proxies)
                                        break
                                    except:
                                        sleep(10)
                                        pass
                                 html = HTML(r1.text)
                                 pic = html.xpath('//*[@imggroup="gal"]/img/@src')
                                 name = sub(r"[\/\\\:\*\?\"\<\>\|\\\n]", "-", p.xpath('./@data-blognickname')[0])
                                 for k in range(len(pic)):
                                    title = f'{date}_{name}_({str(k+1)}).jpg'
                                    upath = f'{getcwd()}/{title}'
                                    if isfile(upath) is True:
                                        self.ms.text_print.emit('已存在:' + title)
                                    else:
                                        while n == 0:
                                            try:
                                                image = get(pic[k].split("?")[0]).content
                                                break
                                            except:
                                                sleep(10)
                                                pass
                                        with open(upath, 'wb') as f:
                                            f.write(image)
                                        self.ms.text_print.emit(title)
                        else:
                            pass
                    else:
                        return True
                else:
                    pass
            # 如果没有下一页按钮,说明最后一页已经爬取完毕
            if not HTMLParser(r_text).css_first('span.w-iar2\000r'):
                return True

        try:
            with ThreadPoolExecutor() as pool:
                for page in range(1, 100000000):
                    future = pool.submit(get_pic, page)
                    if future.result():
                        break
            self.ms.text_print.emit('END')
        # 关闭窗口异常
        except RuntimeError:
            pass

    t = Thread(target=run)
    t.setDaemon(True)
    t.start()

文:

由于文章页存在很多模板,正文可能会存在于各种标签下,我自己手动找到了15种,应该能覆盖99%(

def handleCalc_2(self):
    def run_2():
        tag = self.ui.plainTextEdit_4.toPlainText()
        end_date = self.ui.dateEdit_2.date().toString('yyMMdd')
        min_hot = int(self.ui.plainTextEdit_5.toPlainText())
        shield_tag = self.ui.plainTextEdit_6.toPlainText().split(",")
        self.ms_2.text_print.emit('正在运行,请稍等')

        def get_art(page):
            n = 0
            while n == 0:
                try:
                    r2_text = get(url=f'https://www.lofter.com/tag/{tag}/new?page={page}', headers=headers,
                                  proxies=proxies).text
                    break
                except:
                    sleep(10)
                    pass
            sleep(uniform(0, 0.1))
            page_html = HTML(r2_text)
            div_list = page_html.xpath('//*[@id="main"]/div[@data-blogid]')
            for p in div_list:
                # 是否是文章产出
                if p.xpath('./div[2]/div/div[2]/div[2]/div/div[1]/div[@class="txt js-digest ptag"]'):
                    timestamp = p.xpath('./div[2]/div/div[1]/a/@data-time')[0]
                    date = timestamp13_to_date(timestamp)
                    # 是否满足截止日期
                    if date[:6] >= end_date:
                        try:
                            hot = p.xpath('./div[2]/div/div[2]/div[3]/div[2]/span[1]/a/text()')[0][3:-1]
                            if not hot:
                                hot = '0'
                        except IndexError:
                            hot = '0'
                        # 是否满足热度
                        if int(hot) >= min_hot:
                            art_tag = p.xpath('./div[2]/div/div[2]/div[3]/div[1]/span/a/span/text()')
                            # 是否包含屏蔽词
                            if [i for i in shield_tag if i in art_tag]:
                                pass
                            else:
                                # 获取文章标题
                                try:
                                    tit = p.xpath('./div[2]/div/div[2]/div[2]/div/h2/text()')[0]
                                except IndexError:
                                    tit = '无题'
                                name = p.xpath('./@data-blognickname')[0]
                                title = sub(r"[\/\\\:\*\?\"\<\>\|\\\n]", "-", f'{date}_{name}_{tit}.txt')
                                upath = f'{getcwd()}/{title}'
                                if isfile(upath) is True:
                                    self.ms_2.text_print.emit('已存在:' + title)
                                else:
                                    url = p.xpath('./div[2]/div/div[1]/a/@href')[0]
                                    while n == 0:
                                        try:
                                            r3 = get(url=url, headers=headers, proxies=proxies)
                                            break
                                        except:
                                            sleep(10)
                                            pass
                                    # 正文有可能出现的标签
                                    parser = HTMLParser(r3.text.replace('<br />', '\n'))
                                    res_1 = parser.css('div[class=content]')
                                    res_2 = parser.css('div[class=txtcont]')
                                    res_3 = parser.css('div[class=contt]')
                                    res_4 = parser.css('div[class=cnt\000box]')
                                    res_5 = parser.css('div[class=detail-ct]')
                                    res_6 = parser.css('div[class=post-ct]')
                                    res_7 = parser.css('div[class=listitm\000regular]')
                                    res_8 = parser.css('div[class=ctc\000box]')
                                    res_9 = parser.css('div[class=icontent]')
                                    res_10 = parser.css('div[class=textc]')
                                    res_11 = parser.css('div[class=postdesc]')
                                    res_12 = parser.css('div[class=cnttxt]')
                                    res_13 = parser.css('div[class=posttext]')
                                    res_14 = parser.css('div[class=text]')
                                    res_15 = parser.css('div[class=cont]')
                                    res = res_1 if res_1 else res_2 if res_2 else res_3 if res_3 else res_4 if res_4 else res_5 if res_5 else res_6 if res_6 else res_7 if res_7 else res_8 if res_8 else res_9 if res_9 else res_10 if res_10 else res_11 if res_11 else res_12 if res_12 else res_13 if res_13 else res_14 if res_14 else res_15 if res_15 else []
                                    content = ''
                                    with open(upath, 'a+', encoding='utf-8') as f:
                                        f.write(f'热度:{hot} tag:{art_tag} 日期:{date[:6]} 原文链接:{url}\n'.replace(r'\xa0', ' '))  # 原生字符 \xa0 转换为 空格
                                        for j in res:
                                            for node in HTMLParser(j.html).css('p'):
                                                content += node.text(deep=True, separator='', strip=False) + '\n'
                                        # 没有'p'标签的情况
                                        if not content:
                                            for node in res:
                                                content += node.text(deep=True, separator='', strip=False) + '\n'
                                        f.write(content.replace('\n\n\n', '\n\n').replace('\n\n\n\n', '\n\n\n'))
                                    self.ms_2.text_print.emit(title)
                        else:
                            pass
                    else:
                        return True
                else:
                    pass
            if not HTMLParser(r2_text).css_first('span.w-iar2\000r'):
                return True

        try:
            with ThreadPoolExecutor() as pool:
                for page in range(1, 100000000):
                    future = pool.submit(get_art, page)
                    if future.result():
                        break
            self.ms_2.text_print.emit('END')
        except RuntimeError:
            pass

    t_2 = Thread(target=run_2)
    t_2.setDaemon(True)
    t_2.start()

三、不足

还存在几个小问题。一是Lofter网页版本身有时间线混乱的问题,这种情况比较少见,一旦出现会引起严重强迫症。二是。。(改了再说)

四、拓展

一个没什么用的功能,输入多个tag名称、屏蔽tag、起止年月,可以生成一个用来对比这些tag的各月新增趋势的Excel表格。

from pandas import DataFrame, concat

 Download类下加上这个函数:

def handleCalc_3(self):
    def run_3():
        top_cp = self.ui.plainTextEdit_7.toPlainText().split(",")
        shield_tag = self.ui.plainTextEdit_8.toPlainText().split(",")
        start = self.ui.dateEdit_3.date().toString('yyMM')
        end = self.ui.dateEdit_4.date().toString('yyMM')

        def total_tag(cp):
            n = 0
            while n == 0:
                try:
                    r4 = get(url=f'https://www.lofter.com/tag/{cp}', headers=headers, proxies=proxies)
                    break
                except:
                    sleep(10)
                    pass
            html = HTML(r4.text)
            total = html.xpath('//*[@id="tagpageheader"]/div/div/div[2]/div[1]/div[2]/div[1]/div[2]/text()')[0].split('浏览')[1].split('参与')[0].strip()
            self.ms_3.text_print.emit('tag总数:' + total)
            return total

        def single_cp(cp):
            hots = [[] for i in res]
            new = [0 for i in res]
            n = 0
            for page in range(1, 100000):
                self.ms_3.text_print.emit('page:' + str(page))
                while n == 0:
                    try:
                        r5_text = get(url=f'https://www.lofter.com/tag/{cp}/new?page={page}', headers=headers, proxies=proxies).text
                        break
                    except:
                        sleep(10)
                        pass
                sleep(0.5)
                page_html = HTML(r5_text)
                div_list = page_html.xpath('//*[@id="main"]/div[@data-blogid]')
                for p in div_list:
                    tag = p.xpath('./div[2]/div/div[2]/div[3]/div[1]/span/a/span/text()')
                    if [i for i in shield_tag if i in tag]:
                        pass
                    else:
                        timestamp = p.xpath('./div[2]/div/div[1]/a/@data-time')[0]
                        date = timestamp13_to_date(timestamp)
                        for j in res:
                            if date[:4] > all_month[0]:
                                break
                            if date[:4] == all_month[j]:
                                self.ms_3.text_print.emit('date:' + date[:4])
                                try:
                                    hot = p.xpath('./div[2]/div/div[2]/div[3]/div[2]/span[1]/a/text()')[0][3:-1]
                                    if not hot:
                                        hot = '0'
                                except IndexError:
                                    hot = '0'
                                hots[j].append(int(hot))
                                new[j] += 1
                                break
                            if date[:-4] < all_month[-1]:
                                page = -1
                                break
                            else:
                                pass
                        if page == -1:
                            break
                if page == -1:
                    break
                if not HTMLParser(r5_text).css_first('span.w-iar2\000r'):
                    break
            max_hot = []
            for i in res:
                if not hots[i] and not new[i]:
                    max_hot.append(0)
                else:
                    max_hot.append(max(hots[i]))
                self.ms_3.text_print.emit('时间:' + all_month[i])
                self.ms_3.text_print.emit('月度新增:' + str(new[i]))
                self.ms_3.text_print.emit('最高热度:' + str(max_hot[i]))
            return new, max_hot

        def download_history():
            total_list = {}
            dfs = []
            data = {}
            for cp in top_cp:
                self.ms_3.text_print.emit('统计中,请稍等:' + cp)
                total_list[cp] = total_tag(cp)
                new, max_hot = single_cp(cp=cp)
                data[cp] = [[cp, total_list[cp], new[i], max_hot[i]] for i in res]
            for i in res:
                df = DataFrame([list(data.values())[j][i] for j in range(len(data))], columns=['tag名称', 'tag总数', '月度新增', '最高热度'])
                df.set_index(['tag名称', 'tag总数'], inplace=True)
                dfs.append(df)
            frame = concat(dfs, keys=all_month, axis=1)
            frame.columns.names = ['时间', '数据']
            self.ms_3.text_print.emit('result:')
            self.ms_3.text_print.emit(str(frame))
            frame.to_excel('新建 Microsoft Excel 工作表.xlsx')
            self.ms_3.text_print.emit('已保存至:新建 Microsoft Excel 工作表.xlsx')

        if int(start) > int(end):
            self.ms_3.text_print.emit('请重新输入时间')
        else:
            year = list(range(int(end[:2]), int(start[:2]) - 1, -1))
            month = ['12', '11', '10', '09', '08', '07', '06', '05', '04', '03', '02', '01']
            all_month = [str(i) + j for i in year for j in month]
            all_month = [i for i in all_month if int(start) <= int(i) <= int(end)]
            res = range(len(all_month))
            download_history()
            self.ms_3.text_print.emit('END')

    t_3 = Thread(target=run_3)
    t_3.setDaemon(True)
    t_3.start()

做出来表头长这样:

这篇关于Lofter存档助手的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!