需要解决的问题:
使用什么方式存储ip
文件存储
缺点: 打开文件修改文件操作较麻烦
mysql
缺点: 查询速度较慢
mongodb
缺点: 查询速度较慢. 没有查重功能
redis --> 使用redis存储最为合适
所以 -> 数据结构采用redis中的zset有序集合
获取ip的网站
https://ip.jiangxianli.com/
https://free.kuaidaili.com/free/intr/
项目架构???
项目结构图
项目结构如下:
code文件夹
redis_proxy.py
# -*- encoding:utf-8 -*- # @time: 2022/7/4 11:32 # @author: Maxs_hu """ 这里用来做redis中间商. 去控制redis和ip之间的调用关系 """ from redis import Redis import random class RedisProxy: def __init__(self): # 连接到redis数据库 self.red = Redis( host='localhost', port=6379, db=9, password=123456, decode_responses=True ) # 1. 存储到redis中. 存储之前需要提前判断ip是否存在. 防止将已存在的ip的score抵掉 # 2. 需要校验所有的ip. 查询ip # 3. 验证可用性. 可用分值拉满. 不可用扣分 # 4. 将可用的ip查出来返回给用户 # 先给满分的 # 再给有分的 # 都没有分. 就不给 def add_ip(self, ip): # 外界调用并传入ip # 判断ip在redis中是否存在 if not self.red.zscore('proxy_ip', ip): self.red.zadd('proxy_ip', {ip: 10}) print('proxy_ip存储完毕', ip) else: print('存在重复', ip) def get_all_proxy(self): # 查询所有的ip功能 return self.red.zrange('proxy_ip', 0, -1) def set_max_score(self, ip): self.red.zadd('proxy_ip', {ip: 100}) # 注意是引号的格式 def deduct_score(self, ip): # 先将分数查询出来 score = self.red.zscore('proxy_ip', ip) # 如果有分值.那就扣一分 if score > 0: self.red.zincrby('proxy_ip', -1, ip) else: # 如果分值已经扣的小于0了. 那么可以直接删除了 self.red.zrem('proxy_ip', ip) def effect_ip(self): # 先将ip通过分数筛选出来 ips = self.red.zrangebyscore('proxy_ip', 100, 100, 0, -1) if ips: return random.choice(ips) else: # 没有满分的 # 将九十分以上的筛选出来 ips = self.red.zrangebyscore('proxy_ip', 11, 99, 0, -1) if ips: return random.choice(ips) else: print('无可用ip') return None
ip_collection.py
# -*- encoding:utf-8 -*- # @time: 2022/7/4 11:32 # @author: Maxs_hu """ 这里用来收集ip """ from redis_proxy import RedisProxy import requests from lxml import html from multiprocessing import Process import time import random def get_kuai_ip(red): url = "https://free.kuaidaili.com/free/intr/" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36" } resp = requests.get(url, headers=headers) etree = html.etree et = etree.HTML(resp.text) trs = et.xpath('//table//tr') for tr in trs: ip = tr.xpath('./td[1]/text()') port = tr.xpath('./td[2]/text()') if not ip: # 将不含有ip值的筛除 continue proxy_ip = ip[0] + ":" + port[0] red.add_ip(proxy_ip) def get_unknown_ip(red): url = "https://ip.jiangxianli.com/" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36" } resp = requests.get(url, headers=headers) etree = html.etree et = etree.HTML(resp.text) trs = et.xpath('//table//tr') for tr in trs: ip = tr.xpath('./td[1]/text()') port = tr.xpath('./td[2]/text()') if not ip: # 将不含有ip值的筛除 continue proxy_ip = ip[0] + ":" + port[0] red.add_ip(proxy_ip) def get_happy_ip(red): page = random.randint(1, 5) url = f'http://www.kxdaili.com/dailiip/2/{page}.html' headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36" } resp = requests.get(url, headers=headers) etree = html.etree et = etree.HTML(resp.text) trs = et.xpath('//table//tr') for tr in trs: ip = tr.xpath('./td[1]/text()') port = tr.xpath('./td[2]/text()') if not ip: # 将不含有ip值的筛除 continue proxy_ip = ip[0] + ":" + port[0] red.add_ip(proxy_ip) def get_nima_ip(red): url = 'http://www.nimadaili.com/' headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36" } resp = requests.get(url, headers=headers) etree = html.etree et = etree.HTML(resp.text) trs = et.xpath('//table//tr') for tr in trs: ip = tr.xpath('./td[1]/text()') # 这里存在空值. 所以不能在后面加[0] if not ip: continue red.add_ip(ip[0]) def get_89_ip(red): page = random.randint(1, 26) url = f'https://www.89ip.cn/index_{page}.html' headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36" } resp = requests.get(url, headers=headers) etree = html.etree et = etree.HTML(resp.text) trs = et.xpath('//table//tr') for tr in trs: ip = tr.xpath('./td[1]/text()') if not ip: continue red.add_ip(ip[0].strip()) def main(): # 创建一个redis实例化对象 red = RedisProxy() print("开始采集数据") while 1: try: # 这里可以添加各种采集的网站 print('>>>开始收集快代理ip') get_kuai_ip(red) # 收集快代理 # get_unknown_ip(red) # 收集ip print(">>>开始收集开心代理ip") get_happy_ip(red) # 收集开心代理 print(">>>开始收集泥马代理ip") # get_nima_ip(red) # 收集泥马代理 print(">>>开始收集89代理ip") get_89_ip(red) time.sleep(60) except Exception as e: print('ip储存出错了', e) time.sleep(60) if __name__ == '__main__': main() # 创建一个子进程 # p = Process(target=main) # p.start()
ip_verify.py
# -*- encoding:utf-8 -*- # @time: 2022/7/4 11:34 # @author: Maxs_hu """ 这里用来验证ip的可用性: 使用携程发送请求增加效率 """ from redis_proxy import RedisProxy from multiprocessing import Process import asyncio import aiohttp import time async def verify_ip(ip, red, sem): timeout = aiohttp.ClientTimeout(total=10) # 设置网页等待时间不超过十秒 try: async with sem: async with aiohttp.ClientSession() as session: async with session.get(url='http://www.baidu.com/', proxy='http://'+ip, timeout=timeout) as resp: page_source = await resp.text() if resp.status in [200, 302]: # 如果可用. 加分 red.set_max_score(ip) print('验证没有问题. 分值拉满~', ip) else: # 如果不可用. 扣分 red.deduct_score(ip) print('问题ip. 扣一分', ip) except Exception as e: print('出错了', e) red.deduct_score(ip) print('问题ip. 扣一分', ip) async def task(red): ips = red.get_all_proxy() sem = asyncio.Semaphore(30) # 设置每次三十的信号量 tasks = [] for ip in ips: tasks.append(asyncio.create_task(verify_ip(ip, red, sem))) if tasks: await asyncio.wait(tasks) def main(): red = RedisProxy() time.sleep(5) # 初始的等待时间. 等待采集到数据 print("开始验证可用性") while 1: try: asyncio.run(task(red)) time.sleep(100) except Exception as e: print("ip_verify出错了", e) time.sleep(100) if __name__ == '__main__': main() # 创建一个子进程 # p = Process(target=main()) # p.start()
ip_api.py
# -*- encoding:utf-8 -*- # @time: 2022/7/4 11:35 # @author: Maxs_hu """ 这里用来提供给用户ip接口. 通过写后台服务器. 用户访问我们的服务器就可以得到可用的代理ip: 1. flask 2. sanic --> 今天使用这个要稍微简单一点 """ from redis_proxy import RedisProxy from sanic import Sanic, json from sanic_cors import CORS from multiprocessing import Process # 创建一个app app = Sanic('ip') # 随便给个名字 # 解决跨域问题 CORS(app) red = RedisProxy() @app.route('maxs_hu_ip') # 添加路由 def api(req): # 第一个请求参数固定. 请求对象 ip = red.effect_ip() return json({"ip": ip}) def main(): # 让sanic跑起来 app.run(host='127.0.0.1', port=1234) if __name__ == '__main__': main() # p = Process(target=main()) # p.start()
runner.py
# -*- encoding:utf-8 -*- # @time: 2022/7/5 17:36 # @author: Maxs_hu from ip_api import main as api_run from ip_collection import main as coll_run from ip_verify import main as veri_run from multiprocessing import Process def main(): # 设置互不干扰的三个进程 p1 = Process(target=api_run) # 只需要将目标函数的内存地址传过去即可 p2 = Process(target=coll_run) p3 = Process(target=veri_run) p1.start() p2.start() p3.start() if __name__ == '__main__': main()
测试ip是否可用.py
# -*- encoding:utf-8 -*- # @time: 2022/7/5 18:15 # @author: Maxs_hu import requests def get_proxy(): url = "http://127.0.0.1:1234/maxs_hu_ip" resp = requests.get(url) return resp.json() def main(): url = 'http://mip.chinaz.com/?query=' + get_proxy()["ip"] proxies = { "http": 'http://' + get_proxy()["ip"], "https": 'http://' + get_proxy()["ip"] # 目前代理只支持http请求 } headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36", } resp = requests.get(url, proxies=proxies, headers=headers) resp.encoding = 'utf-8' print(resp.text) # 物理位置 if __name__ == '__main__': main()
项目运行截图:
redis储存截图: