转载自:
国庆节,企查查我来啦~_user_from_future的博客-CSDN博客强迫症的我凑个国庆节注册了账号,直接这么发好似不太好。受到某位女生的工作需求,加上重色轻友的心,所以先拿企查查开刀吧。首先企查查这个网站不登陆也能查公司,不过坑人的就是只能查那么几次,然后就必须要登录了。我想想为了那几次不值得,就搞个登录的爬虫程序吧。众所周知,登录最重要的参数是Cookie,这个一般在浏览器的XHR(XMLHttpRequest对象/Ajax对象等)里复制任意一个元素的Cookie就可以了,建议使用CV大法复制,右击Copy value可能会复制到中文,在此先献上不知道从哪搜到的读https://blog.csdn.net/user_from_future/article/details/120576842我在此将两个文件组合成了一个文件,并对获取cookie部分附写了一些不知道算不算正确的注释,下面是我整理的代码:
# _*_ coding:utf-8 _*_ # FileName: get_qcc_company.py # IDE: PyCharm # 菜菜代码,永无BUG! # https://www.qcc.com/ import sqlite3 import urllib3 import os import json import sys import base64 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes import browsercookie from urllib import parse from bs4 import BeautifulSoup import json import time import requests from random import uniform urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # 取消HTTPS安全警告 def dpapi_decrypt(encrypted): import ctypes import ctypes.wintypes class DATA_BLOB(ctypes.Structure): # ctypes结构体通用格式化输出 _fields_ = [('cbData', ctypes.wintypes.DWORD), # 定义double word(4字节)大小的值 ('pbData', ctypes.POINTER(ctypes.c_char))] # 定义内存指针 p = ctypes.create_string_buffer(encrypted, len(encrypted)) # 生成C类型字符串组 blobin = DATA_BLOB(ctypes.sizeof(p), p) # 一个指向DATA_BLOB结构体的指针 blobout = DATA_BLOB() # 一个指向解密后的数据的DATA_BLOB # 中间五个参数:描述该加密数据的信息、一个指向含有密钥DATA_BLOB的指针、保留参数、不需要弹出风险提升提示设置为None、安全相关的标志 retval = ctypes.windll.crypt32.CryptUnprotectData( ctypes.byref(blobin), None, None, None, None, 0, ctypes.byref(blobout)) if not retval: raise ctypes.WinError() result = ctypes.string_at(blobout.pbData, blobout.cbData) # 获取解密结果 ctypes.windll.kernel32.LocalFree(blobout.pbData) # 释放pbData指向的内存 return result def aes_decrypt(encrypted_txt): with open(os.path.join(os.environ['LOCALAPPDATA'], r"Google\Chrome\User Data\Local State"), encoding='utf-8', mode="r") as f: # 读取本地状态 jsn = json.loads(str(f.readline())) # 读取为json类型 encoded_key = jsn["os_crypt"]["encrypted_key"] # 获取加密键值 encrypted_key = base64.b64decode(encoded_key.encode()) # 解密关键键值 encrypted_key = encrypted_key[5:] # 获取关键键值的关键部位 key = dpapi_decrypt(encrypted_key) # 解密关键键值 nonce = encrypted_txt[3:15] # 获取关键键值的关键部位 cipher = Cipher(algorithms.AES(key), None, backend=default_backend()) # 创建一个空的AES加密对象 cipher.mode = modes.GCM(nonce) # 采用GCM加密模式,初始化向量采用关键值的关键部位 decryptor = cipher.decryptor() # 解密AES return decryptor.update(encrypted_txt[15:]) # 更新解密对象 def chrome_decrypt(encrypted_txt): if sys.platform == 'win32': # 判断系统为Windows try: # 依据字符串开头判断解密方案 if encrypted_txt[:4] == b'x01x00x00x00': decrypted_txt = dpapi_decrypt(encrypted_txt) # 采用dpapi解密 return decrypted_txt.decode() elif encrypted_txt[:3] == b'v10': decrypted_txt = aes_decrypt(encrypted_txt) # 采用aes解密 return decrypted_txt[:-16].decode() except WindowsError: return None else: raise WindowsError def get_cookies_from_chrome(domain): sql = f'SELECT name, encrypted_value as value FROM cookies where host_key like "%{domain}%"' # 获取cookie的sql语句 filename = os.path.join(os.environ['USERPROFILE'], r'AppData\Local\Google\Chrome\User Data\default\Cookies') # 本地cookies文件路径拼接 con = sqlite3.connect(filename) # 使用sqlite3连接cookies数据库 con.row_factory = sqlite3.Row # 需要允许其他人写权限 cur = con.cursor() # 获取游标 cur.execute(sql) # 执行sql语句 cookie = '' # 初始化cookie for row in cur: if row['value'] is not None: name = row['name'] # cookie的键 value = chrome_decrypt(row['value']) # cookie的值 if value is not None: cookie += name + '=' + value + ';' # 拼接cookie return cookie str_time = lambda _: _ == 253392422400 and "9999-09-09" or _ and time.strftime("%Y-%m-%d", time.localtime(_)) or "无固定期限" # 格式化日期 # 格式化网页访问参数 def parse_parameters(string: str): parameters = {} string = string.strip().replace(' ', '') if ':' not in string and '&' in string: for _ in string.split('&'): try: parameters[_.split('=')[0]] = _.split('=')[1] except IndexError: parameters[_.split('=')[0]] = '' else: for _ in string.split('\n'): _ = _.strip() try: parameters[_.split(':')[0]] = _.split(':')[1] except IndexError: parameters[_.split(':')[0]] = '' return parameters # 格式化cookies值 def parse_cookies(cookie_value: str): cookies_dict = {} for c in cookie_value.replace(' ', '').split(';'): try: cookies_dict[c.split('=')[0]] = c.split('=')[1] except IndexError: cookies_dict[c.split('=')[0]] = '' return cookies_dict # json格式化 def dump_json(text: (str, list, tuple, dict)): return json.dumps(text, ensure_ascii=False, indent=4) # 随机休眠,防止过快的爬取 def random_sleep(a=1, b=2): sleep_time = uniform(a, b) time.sleep(sleep_time) doMain = 'qcc.com' # 企查查域名 search_url = "https://www." + doMain + "/web/search" + "?" # 企查查搜索根网址 headers = { "referer": "https://www.qcc.com/", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36" } input_cookie = '' # 可选手动输入cookie值 cookies = input_cookie or get_cookies_from_chrome(doMain) # 无手动输入则从cookie文件中读取 while not (parse_cookies(cookies).get("QCCSESSID", None) and parse_cookies(cookies).get("qcc_did", None)): # 关键参数检测 input('请在浏览器登录企查查!') # 获取公司数据 def get_company(company_name): parameters = f""" key: {company_name} """ parameters = parse_parameters(parameters) r = requests.get(search_url + parse.urlencode(parameters), headers=headers, cookies={"cookie": cookies}) if r.ok: soup = BeautifulSoup(r.text, "html.parser") table = soup.find("table", attrs={"class": "ntable ntable-list"}) if table is None: return f"未搜寻到公司 “{company_name}” !" for tr in table.find_all("tr"): info = tr.find_all("td")[2].find("div") if info.find("a").find("span") is None: continue name_ = info.find("a").find("span").text.replace('(', '(').replace(')', ')') url = info.find("a")["href"] no_kh_things = name_.replace(name_[name_.find('('): name_.rfind(')') + 1], '') no_kh = name_.replace('(', '').replace(')', '') if company_name != no_kh_things and company_name != no_kh: continue r = requests.get(url, headers=headers, cookies={"cookie": cookies}) if r.ok: r.encoding = 'utf-8' soup = BeautifulSoup(r.text, "html.parser") script = soup.find_all('script') for s in script: if 'window.__INITIAL_STATE__' in s.text: script = s.text break else: return '请清除谷歌浏览器缓存,并重新登录企查查重新执行程序!如果多次出现此提示,请手动复制任意XHR的cookie值赋予到cookie变量!' detail = json.loads(script[script.find('{'): script.rfind('};') + 1])["company"]["companyDetail"] return { "企业名称": detail["Name"], "信息更新时间": str_time(detail["UpdatedDate"]), "法定代表人": detail["Oper"]["Name"], "登记状态": detail["Status"], "统一社会信用代码": detail["CreditCode"], "工商注册号": detail["No"], "组织机构代码": detail["OrgNo"], "纳税人识别号": detail["TaxNo"], "纳税人资质": detail.get("TaxpayerType", ''), "注册资本": detail["RegistCapi"], "实缴资本": detail["RecCap"], "登记机关": detail["BelongOrg"], "成立日期": str_time(detail["TermStart"]), "核准日期": str_time(detail["CheckDate"]), "营业期限": str_time(detail["TermStart"]) + "至" + str_time(detail["TeamEnd"]), "注册地址": detail["Address"], "宗旨和业务范围": detail["Scope"], "企业类型": detail["EconKind"], "所属行业": detail["Industry"]["SubIndustry"], "所属地区": detail["Area"]["Province"], "人员规模": detail["profile"]["Info"], "参保人数": [_["Value"] for _ in detail["CommonList"] if _.get("KeyDesc", "") == "参保人数"] and [_["Value"] for _ in detail["CommonList"] if _.get("KeyDesc", "") == "参保人数"][0] or '', "英文名": detail["EnglishName"], "曾用名": detail["OriginalName"] and [_["Name"] for _ in detail["OriginalName"]] or [] } return f"获取公司 “{name_}” 详情信息失败!" return f"未搜寻到公司 “{company_name}” !" return "搜索失败!" if __name__ == '__main__': print(dump_json(get_company('浙江阿瓦隆科技有限公司')))