想用Python做个某点评的自动点赞互粉功能,毕竟手点太浪费时间
由于这里是直接用selenium实现的,所以基本和用户的操作差不多,先定位元素在进行相应的操作
由于是自动给粉丝点赞,所以必须先登录才行,以便获取谷歌的cookie(注:当然也可以做个自动登录的功能,不过嫌太麻烦就没做了)。谷歌的cookie路径:C:\Users\“用户名”\AppData\Local\Google\Chrome\User Data\Default(注:使用cookie的时候需要关闭浏览器,不然程序会报错)
这个就比较简单了,直接通过xpath定位到元素,然后模拟点击事件就可以了,依次打开 首页->粉丝 就行了
注:这里由于会打开新的页签,所以需要切换到新打开的页签才能就行操作,否则元素定位不到
分析了下页面,这个鼠标悬浮在上边的时候才会出现关注的按钮,所以我们只要定位到元素再模拟鼠标悬浮,最后再点击关注就可以了
点赞的话,逻辑差不多。只需要打开粉丝的详情页进入评价页面,获取到未点赞的点赞文章进行点赞就可以了
注:由于点开粉丝页之后需要回退,所以图省事,这里就再打开一个新的页签进行操作,点赞完成之后再关闭页签
由于粉丝和粉丝文章可能存在多页,所以需要进行翻页操作。这个也简单,只需要定位到下一页元素,模拟点击事件就可以了,停止的条件也简单,我们可以看下图,到最后一页后,下一页元素会消失,所以判断下一页元素存不存在即可
达到最后一页,如图:
最终代码如下:
import base64 import getopt import json; import os, winreg import random import sqlite3 import sys import time import signal import fake_useragent import requests; import urllib3 from bs4 import BeautifulSoup; from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from selenium import webdriver import abc import shutil import logging import pickle from selenium.common.exceptions import InvalidArgumentException from selenium.webdriver import ActionChains urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) logging.basicConfig(level=logging.INFO) # 点赞数 thumbs_up_count = 1; """ 谷歌浏览器工具类 """ class google_util(): @classmethod def __dpapi_decrypt(cls, encrypted): import ctypes.wintypes class DATA_BLOB(ctypes.Structure): _fields_ = [('cbData', ctypes.wintypes.DWORD), ('pbData', ctypes.POINTER(ctypes.c_char))] p = ctypes.create_string_buffer(encrypted, len(encrypted)) blobin = DATA_BLOB(ctypes.sizeof(p), p) blobout = DATA_BLOB() retval = ctypes.windll.crypt32.CryptUnprotectData( ctypes.byref(blobin), None, None, None, None, 0, ctypes.byref(blobout)) if not retval: raise ctypes.WinError() result = ctypes.string_at(blobout.pbData, blobout.cbData) ctypes.windll.kernel32.LocalFree(blobout.pbData) return result @classmethod def __aes_decrypt(cls, encrypted_txt): browser_data_path = google_util.get_user_data_path(); with open(os.path.join(browser_data_path, r'Local State'), encoding='utf-8', mode="r") as f: jsn = json.loads(str(f.readline())) encoded_key = jsn["os_crypt"]["encrypted_key"] encrypted_key = base64.b64decode(encoded_key.encode()) encrypted_key = encrypted_key[5:] key = google_util.__dpapi_decrypt(encrypted_key) nonce = encrypted_txt[3:15] cipher = Cipher(algorithms.AES(key), None, backend=default_backend()) cipher.mode = modes.GCM(nonce) decryptor = cipher.decryptor() return decryptor.update(encrypted_txt[15:]) @classmethod def __chrome_decrypt(cls, encrypted_txt): if sys.platform == 'win32': try: if encrypted_txt[:4] == b'x01x00x00x00': decrypted_txt = google_util.__dpapi_decrypt(encrypted_txt) return decrypted_txt.decode() elif encrypted_txt[:3] == b'v10': decrypted_txt = google_util.__aes_decrypt(encrypted_txt) return decrypted_txt[:-16].decode() except WindowsError: return None else: raise WindowsError @classmethod def get_cookie(cls, domain): browser_data_path = google_util.get_user_data_path(); sql = f'SELECT name, encrypted_value as value FROM cookies where host_key like "%{domain}%"' file_name = os.path.join(browser_data_path, r'Default\Cookies') con = sqlite3.connect(file_name) con.row_factory = sqlite3.Row cur = con.cursor() cur.execute(sql) cookie = '' for row in cur: if row['value'] is not None: name = row['name'] value = google_util.__chrome_decrypt(row['value']); if value is not None: cookie += name + '=' + value + ';' return cookie # 取得浏览器的安装路径 @classmethod def get_install_path(cls): try: key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Clients\StartMenuInternet\Google Chrome\DefaultIcon") except FileNotFoundError: return '' value, type = winreg.QueryValueEx(key, "") # 获取默认值 full_file_name = value.split(',')[0] # 截去逗号后面的部分 dir_name, file_name = os.path.split(full_file_name) # 分离文件名和路径 return dir_name # 取得浏览器的数据路径 @classmethod def get_user_data_path(cls): user_path = os.environ['LOCALAPPDATA'] return user_path + r'\Google\Chrome\User Data'; @classmethod def copy_user_data(cls, target_path=None): if target_path is None or target_path == '': target_path = os.path.split(os.path.realpath(__file__))[0] source_path = google_util.get_user_data(); shutil.copytree(source_path, target_path); """ 粉丝文章对象 """ class fan_article(object): def __init__(self,article_id, article_name): self.__article_id = article_id; self.__article_name = article_name; @property def article_name(self): return self.__article_name @article_name.setter def article_name(self, article_name): self.__article_name = article_name; @property def article_id(self): return self.__article_id @article_name.setter def article_id(self, article_id): self.__article_id = article_id; """ selenium 粉丝文章对象 """ class selenium_fan_article(fan_article): def __init__(self, article_id,article_name, ele): super(selenium_fan_article, self).__init__(article_id,article_name); self.__ele = ele; @property def ele(self): return self.__ele @ele.setter def ele(self, ele): self.__ele = ele; """ requests 粉丝文章对象 """ class requests_fan_article(fan_article): def __init__(self,article_id, article_name,referer_url): super(requests_fan_article, self).__init__(article_id,article_name); self.__referer_url = referer_url; @property def referer_url(self): return self.__referer_url @referer_url.setter def referer_url(self, referer_url): self.__referer_url = referer_url; """ 粉丝对象 """ class fan(object): def __init__(self, id, name,url): self.__id = id; self.__name = name; self.__url = url; @property def name(self): return self.__name @name.setter def name(self, name): self.__name = name; @property def id(self): return self.__id @id.setter def id(self, id): self.__id = id @property def url(self): return self.__url @url.setter def url(self, url): self.__url = url; def __eq__(self, other): return self.id == other.id; """ selenium 粉丝对象 """ class selenium_fan(fan): def __init__(self, id, name,url,ele): super(selenium_fan, self).__init__(id, name,url); self.__ele = ele; @property def ele(self): return self.__ele @ele.setter def ele(self, ele): self.__ele = ele; def set_ele(self, ele): self.__ele = ele; """ requests 粉丝对象 """ class requests_fan(fan): def __init__(self, id, name, url,review_url): super(requests_fan, self).__init__(id, name,url); self.__review_url = review_url; @property def review_url(self): return self.__review_url @review_url.setter def review_url(self, review_url): self.__review_url = review_url; """ 大众点评抽象类 """ class dianping_reptile(metaclass=abc.ABCMeta): def __init__(self, is_continue=False, is_follow_fans=True, is_thumbs_up=True): self._cookie = None; self._ua = fake_useragent.UserAgent(path='fake_useragent.json'); # 点赞数 self._thumbs_up_count = thumbs_up_count; self._base_url = 'http://www.dianping.com'; # 已经点赞粉丝 self._already_thumbs_up_fans = []; # 此次点赞粉丝 self._this_thumbs_up_fans = []; # 持久化文件路径 self._ser_path = 'fans.ser'; # 是否已经初始化 self._init = False; # 是否继续上次爬虫 self._is_continue = is_continue; self._is_stop = False; # 是否自动点赞 self._is_thumbs_up = is_thumbs_up; # 是否回粉 self._is_follow_fans = is_follow_fans; def _get_cookie(self): if self._cookie is None: self._cookie = google_util.get_cookie('.dianping.com') logging.debug('获取cookie:{}'.format(self._cookie)); return self._cookie; """ 初始化 """ def _init_reptile(self): self._init = True; """ 爬虫退出,调用销毁函数 """ def exit(self): logging.info('############### 爬虫退出 ###############') if self._is_stop is False: # 如果是点赞,则写入粉丝数据 if self._is_thumbs_up: self._write_fans(); logging.info('此次已点赞粉丝个数:{}'.format(len(self._this_thumbs_up_fans))) self._destroy(); self._is_stop = True; """ 定义爬虫入口方法 定义模板(执行步骤) """ def start(self): begin_time = time.time(); if self._init is False: logging.info('############### 爬虫初始化 ###############') self._init_reptile(); self._read_fans(); cookie = self._get_cookie(); self._before(cookie); try: logging.info('############### 爬虫开始 ###############') self._reptile(); except Exception as e: raise e; finally: end_time = time.time(); run_time = end_time - begin_time logging.info('该循环程序运行时间:{0}'.format(run_time)); self.exit(); """ 读取已经点赞的粉丝 """ def _read_fans(self): if self._is_thumbs_up is False: return if self._is_continue: logging.info('############### 读取上次已经点赞的粉丝') if os.access(self._ser_path, os.R_OK) is False: logging.warning('######### 文件不存在或者不可读取'); return; if os.path.getsize(self._ser_path) == 0: logging.warning('######### 上次点赞粉丝数为空'); return; with open(self._ser_path, 'rb') as f: self._already_thumbs_up_fans = pickle.load(f) else: logging.info('############### 重新开始点赞,删除上次已经点赞的粉丝') if os.path.isfile(self._ser_path): os.remove(self._ser_path) """ 写入已经点赞的粉丝 """ def _write_fans(self): logging.info('############### 写入此次已经点赞的粉丝') with open(self._ser_path, "ab+") as f: if len(self._this_thumbs_up_fans) > 0: list(map(lambda f: f.set_ele(None), self._this_thumbs_up_fans)) pickle.dump(self._this_thumbs_up_fans, f) """ 爬虫开始准备工作 """ def _before(self, cookie): pass """ 爬虫结束回调 """ def _destroy(self): pass """ 回粉 """ def _follow_fans(self, fan): if self._is_follow_fans: if self._do_follow_fans(fan): logging.info('################ 对"{0}"回粉'.format(fan.name)) @abc.abstractmethod def _do_follow_fans(self, fan): pass """ 对粉丝点赞 """ def _thumbs_up_fan(self, fan): if self._is_thumbs_up is False: return if self._is_continue and fan in self._already_thumbs_up_fans: return self._sleep_time(); logging.debug('################ 进入"{0}"详细页面'.format(fan.name)) self._goto_fan_review(fan); self._thumbs_up_article(fan); """ 爬取粉丝数据 """ def _reptile(self): while True: fans = self._get_fans(); for fan in fans: self._follow_fans(fan) self._thumbs_up_fan(fan) if self._has_next_page_fan(): self._sleep_time(); logging.debug('################ 粉丝翻页') self._next_page_fan(); else: break; """ 进入粉丝评价页面 """ @abc.abstractmethod def _goto_fan_review(self, fan): pass; """ 点赞文章 """ def _thumbs_up_article(self, fan): logging.debug('################ 执行点赞文章前置方法'); self._before_thumbs_up_article(fan); count = 0; while True: self._sleep_time(); # 获取当前页面所有未点赞文章 articles = self._get_unthumbs_up_article(fan); if len(articles) == 0: logging.info('############## 粉丝"{0}" 还未发表文章或不存在未点赞文章'.format(fan.name)) break; # 打乱文章顺序 random.shuffle(articles) for article in articles: self._sleep_time(); if self._thumbs_up(article): logging.info('############## 粉丝 "{0}"的文章: {1} 点赞成功'.format(fan.name, article.article_name)); count += 1; else: logging.error('############## 粉丝 "{0}"的文章: {1} 点赞失败'.format(fan.name, article.article_name)); if self._is_over_thumbs_count(count): break; if self._is_over_thumbs_count(count): logging.info('####################### 粉丝"{}"点赞完成 #######################'.format(fan.name)); break; # 文章是否存在下一页 if self._has_next_page_article(): logging.debug('################ 文章翻页') self._next_page_article(fan); else: logging.info('################ 粉丝 "${0}" 不存在下一页文章,点赞失败'.format(fan.name)) break; self._this_thumbs_up_fans.append(fan); self._sleep_time(); logging.debug('################ 执行点赞文章后置方法'); self._after_thumbs_up_article(fan); """ 点完赞前置处理 """ def _before_thumbs_up_article(self, fan): pass """ 点完赞后置处理 """ def _after_thumbs_up_article(self, fan): pass """ 判断是否超过点赞数 """ def _is_over_thumbs_count(self, count=0): if self._thumbs_up_count > count: return False; return True; """ 粉丝翻页 """ @abc.abstractmethod def _next_page_fan(self): pass """ 粉丝是否存在下一页 """ @abc.abstractmethod def _has_next_page_fan(self): pass """ 文章是否存在下一页 """ @abc.abstractmethod def _has_next_page_article(self): pass """ 文章翻页 """ @abc.abstractmethod def _next_page_article(self, fan): pass """ 获取当前页面粉丝 """ @abc.abstractmethod def _get_fans(self): pass """ 点赞文章 """ @abc.abstractmethod def _thumbs_up(self, article): pass """ 获取未点赞的文章 """ @abc.abstractmethod def _get_unthumbs_up_article(self, fan): pass def _sleep_time(self): time.sleep(random.randint(3, random.randint(10, 15))); @property def base_url(self): return self._base_url; @property def fan_thumbs_up_count(self): return self._fan_thumbs_up_count; """ selenium爬虫实现类 """ class selenium_dianping_reptile(dianping_reptile): def __init__(self, driver_path, is_continue=False, is_follow_fans=True, is_thumbs_up=True, is_show=True): super(selenium_dianping_reptile, self).__init__(is_continue, is_follow_fans, is_thumbs_up); self.__driver_path = driver_path; self.__driver = None; self.__options = None; self.__is_show = is_show; """ 初始化谷歌驱动参数 """ def __get_options(self): if self.__options is None: self.__options = webdriver.ChromeOptions() # 添加cookie路径 self.__options.add_argument(r"--user-data-dir=" + google_util.get_user_data_path()); # 防止网站识别出selenium self.__options.add_experimental_option('excludeSwitches', ['enable-automation']) self.__options.add_argument("--disable-blink-features=AutomationControlled"); # 过滤无用日志 self.__options.add_experimental_option("excludeSwitches", ['enable-automation', 'enable-logging']) # self.__options.add_argument("User-Agent=Mozilla/5.0 (iPhone; CPU iPhone OS 8_0 like Mac OS X) AppleWebKit/600.1.3 (KHTML, like Gecko) Version/8.0 Mobile/12A4345d Safari/600.1.4"); self.__options.add_argument( 'User-Agent=' + ' Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.61 Safari/537.36'); # 无界面运行 if self.__is_show is False: self.__options.add_argument('headless') return self.__options """ 获取谷歌驱动 """ def get_driver(self): if self.__driver is None: try: self.__driver = webdriver.Chrome(executable_path=self.__driver_path, options=self.__get_options()); except InvalidArgumentException as e: raise Exception('谷歌浏览器已经被打开,错误:{}'.format(e.msg)) # 防止网站识别出selenium self.__driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", { "source": """ Object.defineProperty(navigator, 'webdriver', { get: () => undefined }) """ }) return self.__driver; def _init_reptile(self): driver = self.get_driver(); # 打开首页 driver.get(self._base_url) self._sleep_time(); driver.find_element_by_xpath('//*[@id="top-nav"]/div/div[2]/span[1]/a[1]').click(); # 切换到最新窗口 self.__switch_newest_window(); self._sleep_time(); # 打开粉丝页 driver.find_element_by_xpath('/html/body/div[1]/div[2]/div[3]/div/div[1]/div[1]/div/div[1]/ul/li[2]/a').click(); # 打开关注列表 # driver.find_element_by_xpath('/html/body/div[1]/div[2]/div[3]/div/div[1]/div[1]/div/div[1]/ul/li[1]/a').click(); self._init = True; """ 切换到最新窗口 """ def __switch_newest_window(self): windows = self.__driver.window_handles self.__driver.switch_to.window(windows[-1]) def _goto_fan_review(self, fan): cmd = 'window.open("{}")' self.__driver.execute_script(cmd.format(fan.url)); self.__switch_newest_window(); self.__driver.find_element_by_xpath( '/html/body/div[1]/div[2]/div[1]/div/div/div/div[2]/div[3]/ul/li[2]/a').click(); """ 点击下一页文章 """ def _next_page_article(self, fan): self.__driver.find_element_by_css_selector('a.page-next').click(); def _next_page_fan(self): self.__driver.find_element_by_css_selector('a.page-next').click(); def _has_next_page_fan(self): try: self.__driver.find_element_by_css_selector('a.page-next'); return True; except: return False; def _has_next_page_article(self): try: self.__driver.find_element_by_css_selector('a.page-next'); return True; except: return False; def _do_follow_fans(self, fan): try: # 自动点关注 add_ele = fan.ele.find_element_by_css_selector('span.J_add'); ActionChains(self.__driver).move_to_element(fan.ele).perform(); self._sleep_time(); add_ele.click(); return True except: return False; def _get_fans(self): fan_eles = self.__driver.find_elements_by_xpath('/html/body/div[1]/div[2]/div[2]/div/div/div[2]/div/ul/li'); fans = []; for fan_ele in fan_eles: e = fan_ele.find_element_by_tag_name('h6').find_element_by_tag_name('a'); fans.append(selenium_fan(e.get_attribute('user-id'), e.text,e.get_attribute('href'), fan_ele)); return fans; def _get_unthumbs_up_article(self, fan): article_eles = self.__driver.find_elements_by_xpath('//*[@id="J_review"]/div/ul/li/div'); if len(article_eles) > 0: articles = []; for article_ele in article_eles: try: # 获取未点赞标识 article_ele.find_element_by_css_selector('i.heart-s1'); a_ele = article_ele.find_element_by_tag_name('a'); id_ele =article_ele.find_element_by_css_selector('a.J_flower'); articles.append(selenium_fan_article(id_ele.get_attribute('data-id'),a_ele.text, article_ele)); except: continue; return articles; return []; """ 点完赞关闭窗口 """ def _after_thumbs_up_article(self, fan): self.__driver.close(); self.__switch_newest_window(); def _thumbs_up(self, article): try: i_ele = article.ele.find_element_by_css_selector('i.heart-s1'); i_ele.click(); time.sleep(2) # 如果还存在未点赞的样式,则点赞失败 article.ele.find_element_by_css_selector('i.heart-s1'); except: return True; return False; def _destroy(self): # 关闭浏览器 self.__driver.quit() """ request爬虫 这个老被限制 """ class requests_dianping_reptile(dianping_reptile): def __init__(self, is_continue=False, is_follow_fans=True, is_thumbs_up=True): super(requests_dianping_reptile, self).__init__(is_continue, is_follow_fans, is_thumbs_up) self.__user_card_url = self._base_url + '/dpnav/userCardData' self.__user_fans_url = self._base_url + '/member/'; self.__thumbs_up_url = self._base_url + '/ajax/json/shop/reviewflower'; self.__fans_current_page = 1; self.__fans_page_count = 0; self.__article_current_page = 1; self.__article_page_count = 0; def _before(self, cookie): self.__get_user_info(); self.__fans_page_count = self.__get_page_count(url=self.__user_fans_url) """ 请求内容结果解析 """ def __get_url_content(self, url, type='html', referer=None): headers = self.__get_headers(referer); self._sleep_time() result = requests.get(url, headers=headers); if type == 'html': return BeautifulSoup(result.content, 'lxml'); else: return json.loads(result.content.decode("utf-8")); """ 获取随机请求头 """ def __get_headers(self, referer): return {'Cookie': self._cookie, 'User-Agent': self._ua.random, 'Referer': referer, 'Host': 'www.dianping.com'} """ 获取粉丝文章页数 """ def __get_article_page_count(self, fan_url): fan_url = fan_url + '/reviews'; return self.__get_page_count(fan_url); """ 获取总页数 """ def __get_page_count(self, url): html = self.__get_url_content(url, referer=self._base_url) page_arr = html.select("div.pages-num a"); """ 页数不为空 """ if len(page_arr) > 0: # 删除下一页 page_arr.pop(); return int(page_arr[len(page_arr) - 1].getText()); return 1; """ 获取用户信息,拼接粉丝页面 """ def __get_user_info(self): result = self.__get_url_content(url=self.__user_card_url, type='json'); self.__user_fans_url = self.__user_fans_url + result['msg']['userCard']['userId'] + '/fans'; def _goto_fan_review(self, fan): pass def _next_page_fan(self): self.__fans_current_page += 1; return self._get_fans(self.__get_fan_url()); def _has_next_page_fan(self): if self.__fans_page_count >= self.__fans_current_page: return True; return False; def _has_next_page_article(self): if self.__article_page_count >= self.__article_current_page: return True; return False; def _next_page_article(self, fan): self.__article_current_page += 1; self._get_unthumbs_up_article(self.__get_article_url(fan)); def _do_follow_fans(self, fan): pass """ 拼接粉丝页面的url """ def __get_fan_url(self): return self.__user_fans_url + '?pg=' + str(self.__fans_current_page) def _get_fans(self): url = self.__get_fan_url(); logging.info('###### 粉丝页数url:{0}'.format(url)); html = self.__get_url_content(url, referer=self.__user_fans_url); fan_eles = html.select("div.fllow-list div ul li"); fans = []; for fan_ele in fan_eles: a_ele = fan_ele.select_one('div.txt div.tit h6 a') fan_url = self._base_url + a_ele.attrs['href']; fan = requests_fan(id=a_ele.attrs['user-id'], name=a_ele.text, url=fan_url, review_url=fan_url + '/reviews?pg={0}&reviewCityId=0&reviewShopType=0&c=0&shopTypeIndex=0'); fans.append(fan) return fans; def _thumbs_up(self, article): url = ''; thumbs_up_headers = self.__get_headers(referer=article.referer_url); thumbs_up_headers['Origin'] = self._base_url; params = {'t': 1, 's': 2, 'do': 'aa', 'i': id}; result = requests.post(self.__thumbs_up_url, headers=thumbs_up_headers, params=params); print('粉丝详情url:' + url); print('结果:' + result.content.decode("utf-8")); if result.status_code == 200: result_json = json.loads(result.content.decode("utf-8")) # 点赞成功 if result_json['code'] == 900: return True return False if result.status_code == 403: raise Exception('爬虫被限制') def __get_article_url(self, fan): return fan.review_url.format(self.__article_current_page); def _get_unthumbs_up_article(self, fan): url = self.__get_article_url(fan); html = self.__get_url_content(url, referer=fan.url); article_eles = html.select("div.J_rptlist") unthumbs = []; for article_ele in article_eles: # 去除已经点赞 if article_ele.find(class_="heart-s1") is not None: ele = article_ele.find('div', attrs={'class': 'tit'}) unthumbs.append(requests_fan_article(article_ele.find(class_="heart-s1").parent.attrs['data-id'],ele.text,referer_url=fan.url+'/reviews')); return unthumbs; def _after_thumbs_up_article(self, fan): # 点完赞,清空文章信息 self.__article_current_page = 1; self.__article_page_count = 0; def get_args(): opts, args = getopt.getopt(sys.argv[1:], "ho:", ["help", "output="]) def exit_handler(signum, frame): logging.info('################### 爬虫停止') print(signum) if __name__ == '__main__': reptile = selenium_dianping_reptile(r"chromedriver.exe", is_follow_fans=True, is_thumbs_up=True); # reptile = requests_dianping_reptile(is_follow_fans=False, is_thumbs_up=True); # signal.signal(signal.SIGINT, exit_handler) reptile.start()
原本使用requests的,但是没搞懂网站的反爬机制,被各种限制(╥╯^╰╥),就选择用了selenium来实现了( ̄▽ ̄)~*。如果有知道requests为什么被限制的大佬希望能够告知一下(〃‘▽’〃)