前面我们学了Scrapy框架,分布式Scrapy,Scrapy_Splash以及布隆过滤的工作原理。现在提一个需求,如果要把这些功能全部都用上该怎么办?
本节我们需要动用洪荒之力来阅读框架和模块源码,手动将这些功能融合起来,来实现一个可以具备Scrapy_redis分布式调度器,Bloom过滤器和Splash的Js渲染的爬虫功能。下图是我们这一节要实现的功能,红框内部是我们需要特殊处理的部分。
涉及到的模块:
① scrapy_redis:原本是用于分布式爬虫的调度器使用的,调度器具备过滤和调度作用,这里我们只用他的调度功能。
② scrapy_splash:用于对splash服务器发起请求。
③ scrapy_redis_bloomfilter:在scrapy_redis的基础上用上布隆过滤的功能。
由于使用这每一个模块时,都需要在settings.py文件中配置各自的DUPEFILTER_CLASS属性。
DUPEFILTER_CLASS = ‘scrapy_splash.SplashAwareDupeFilter’
DUPEFILTER_CLASS = “scrapy_redis.dupefilter.RFPDupeFilter”
DUPEFILTER_CLASS = “scrapy_redis_bloomfilter.dupefilter.RFPDupeFilter”
这里希望三者都可以用上,总不能将三者都写上吧?因此这里我们需要自己写一个过滤器,同时将三者的功能都写进来,最后指定我们自己写的过滤器。
需求说明:
爬虫
请求是splash请求。调度器
是scrapy_redis,可以实现分布式爬虫。过滤器
是布隆过滤器,替换掉scrapy_redis中的过滤器。从上面源码可知,scrapy_redis过滤器和布隆过滤器都继承了scrapy的过滤器,并且都重写了基类的函数,这次我们的目标是在分布式的基础上用上布隆过滤器,同时还要用上scrapy_splash的过滤器,因此我们可以使用如下方法。
from scrapy_redis.dupefilter import RFPDupeFilter as BaseRFPDupeFilter from scrapy.utils.url import canonicalize_url from scrapy.utils.request import request_fingerprint from scrapy_splash.utils import dict_hash from copy import deepcopy import logging import time from scrapy_redis_bloomfilter.defaults import BLOOMFILTER_HASH_NUMBER, BLOOMFILTER_BIT, DUPEFILTER_DEBUG from scrapy_redis_bloomfilter import defaults from scrapy_redis.connection import get_redis_from_settings from scrapy_redis_bloomfilter.bloomfilter import BloomFilter logger = logging.getLogger(__name__) def splash_request_fingerprint(request, include_headers=None): fp = request_fingerprint(request, include_headers=include_headers) if 'splash' not in request.meta: return fp splash_options = deepcopy(request.meta['splash']) args = splash_options.setdefault('args', {}) if 'url' in args: args['url'] = canonicalize_url(args['url'], keep_fragments=True) return dict_hash(splash_options, fp) # 你现在这个玩意已经可以兼容redis+splash # 继承了RedisDupeFilter. # 把splash的东西. 复制过来了 # 把bloom的东西全部拷贝过来 class MyDupeFilter(BaseRFPDupeFilter): logger = logger def __init__(self, server, key, debug, bit, hash_number): self.server = server self.key = key self.debug = debug self.logdupes = True self.bit = bit self.hash_number = hash_number self.bf = BloomFilter(server, self.key, bit, hash_number) @classmethod def from_settings(cls, settings): server = get_redis_from_settings(settings) key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())} debug = settings.getbool('DUPEFILTER_DEBUG', DUPEFILTER_DEBUG) bit = settings.getint('BLOOMFILTER_BIT', BLOOMFILTER_BIT) hash_number = settings.getint('BLOOMFILTER_HASH_NUMBER', BLOOMFILTER_HASH_NUMBER) return cls(server, key=key, debug=debug, bit=bit, hash_number=hash_number) @classmethod def from_crawler(cls, crawler): instance = cls.from_settings(crawler.settings) return instance @classmethod def from_spider(cls, spider): settings = spider.settings server = get_redis_from_settings(settings) dupefilter_key = settings.get("SCHEDULER_DUPEFILTER_KEY", defaults.SCHEDULER_DUPEFILTER_KEY) key = dupefilter_key % {'spider': spider.name} debug = settings.getbool('DUPEFILTER_DEBUG', DUPEFILTER_DEBUG) bit = settings.getint('BLOOMFILTER_BIT', BLOOMFILTER_BIT) hash_number = settings.getint('BLOOMFILTER_HASH_NUMBER', BLOOMFILTER_HASH_NUMBER) print(key, bit, hash_number) instance = cls(server, key=key, debug=debug, bit=bit, hash_number=hash_number) return instance def request_seen(self, request): fp = self.request_fingerprint(request) # This returns the number of values added, zero if already exists. if self.bf.exists(fp): return True self.bf.insert(fp) return False def log(self, request, spider): if self.debug: msg = "Filtered duplicate request: %(request)s" self.logger.debug(msg, {'request': request}, extra={'spider': spider}) elif self.logdupes: msg = ("Filtered duplicate request %(request)s" " - no more duplicates will be shown" " (see DUPEFILTER_DEBUG to show all duplicates)") self.logger.debug(msg, {'request': request}, extra={'spider': spider}) self.logdupes = False spider.crawler.stats.inc_value('bloomfilter/filtered', spider=spider) def request_fingerprint(self, request): return splash_request_fingerprint(request)
BOT_NAME = 'news' SPIDER_MODULES = ['news.spiders'] NEWSPIDER_MODULE = 'news.spiders' # Obey robots.txt rules ROBOTSTXT_OBEY = False LOG_LEVEL = "WARNING" # scrapy_splash # 渲染服务的url, 这里换成你自己的 SPLASH_URL = 'http://192.168.31.172:8050' # 下载器中间件, 这个必须要配置 DOWNLOADER_MIDDLEWARES = { 'scrapy_splash.SplashCookiesMiddleware': 723, 'scrapy_splash.SplashMiddleware': 725, 'scrapy.downloadermiddlewares.httpcompression.HttpCompressionMiddleware': 810, } # 这个可由可无 # SPIDER_MIDDLEWARES = { # 'scrapy_splash.SplashDeduplicateArgsMiddleware': 100, # } # 更换为自己的过滤器. 同时兼容redis, splash, bloom DUPEFILTER_CLASS = 'news.dupefilter.MyDupeFilter' # 使用Splash的Http缓存, 这个必须要配置 HTTPCACHE_STORAGE = 'scrapy_splash.SplashAwareFSCacheStorage' # redis # Configure item pipelines # See https://docs.scrapy.org/en/latest/topics/item-pipeline.html ITEM_PIPELINES = { 'scrapy_redis.pipelines.RedisPipeline': 301, # 可选项 } # redis相关配置 REDIS_HOST = "127.0.0.1" REDIS_PORT = 6379 REDIS_DB = 14 REDIS_PARAMS = { "password": "123456", } # scrapy_redis相关配置 # scrapy-redis配置信息 # 固定的 SCHEDULER = "scrapy_redis.scheduler.Scheduler" SCHEDULER_PERSIST = True # 如果为真. 在关闭时自动保存请求信息, 如果为假, 则不保存请求信息 # DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter" # 注意这里 # bloom # 去重类,要使用 BloomFilter 请替换 DUPEFILTER_CLASS # DUPEFILTER_CLASS = "scrapy_redis_bloomfilter.dupefilter.RFPDupeFilter" # 哈希函数的个数,默认为 6,可以自行修改 BLOOMFILTER_HASH_NUMBER = 6 # BloomFilter 的 bit 参数,默认 30,占用 128MB 空间,去重量级 1 亿 BLOOMFILTER_BIT = 30
import scrapy from scrapy_splash.request import SplashRequest from scrapy_redis.spiders import RedisSpider from news.items import NewsItem lua_source = """ function main(splash, args) assert(splash:go(args.url)) assert(splash:wait(2)) -- 准备一个js函数. 预加载 -- jsfunc是splash预留的专门为了js代码和lua代码结合准备的 get_btn_display = splash:jsfunc([[ function(){ return document.getElementsByClassName('load_more_btn')[0].style.display; } ]]) while(true) do splash:runjs("document.getElementsByClassName('load_more_btn')[0].scrollIntoView(true)") splash:select(".load_more_btn").click() splash:wait(1) -- 判断load_more_btn是否是none. display = get_btn_display() if(display == 'none') then break end end return splash:html() -- 直接返回页面源代码 end """ class WangyiSpider(RedisSpider): name = 'wangyi' allowed_domains = ['163.com'] # redis_key = "wangyi:news:start_urls" start_urls = ['https://news.163.com/'] # 重写start_request, 这地方由于重写了start_request, 就不需要redis_key了 def start_requests(self): yield SplashRequest( url=self.start_urls[0], callback=self.parse, endpoint="execute", # 终端表示你要执行哪一个splash的服务 args={ "lua_source": lua_source }, dont_filter=True # 不去重. 直接进队列 ) def parse(self, response): divs = response.xpath("//ul[@class='newsdata_list fixed_bar_padding noloading']/li[1]/div[2]/div") for div in divs: a = div.xpath("./div/div/h3/a") if not a: # 过滤掉广告 continue a = a[0] xw = NewsItem() xw['url'] = a.xpath("./@href").extract_first() xw['title'] = a.xpath("./text()").extract_first() yield xw print(1) yield scrapy.Request( url=xw['url'], )
import scrapy class NewsItem(scrapy.Item): # define the fields for your item here like: title = scrapy.Field() url = scrapy.Field()
本节内容难度较大,很少会遇到这样的情况,如果以后遇到类似的需求,希望可以从中借鉴整合框架模块代码的能力。本节对面向对象的要求较高,需要合理根据需求,利用面向对象的特性做到最少代码改动。