import requests import json from requests.exceptions import RequestException import concurrent.futures class Spider(): def __init__(self,bvid): self.headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', # noqa 'Accept-Charset': 'UTF-8,*;q=0.5', 'Accept-Encoding': 'gzip,deflate,sdch', 'Accept-Language': 'en-US,en;q=0.8', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.74 Safari/537.36 Edg/79.0.309.43', } self.bvid=bvid def get_page(self): try: url = f'https://api.bilibili.com/x/player/pagelist?bvid={self.bvid}&jsonp=jsonp' r = requests.get(url,self.headers) if r.status_code ==200: return r.text else: print(r.status_code) except RequestException: print('请求失败') return None def parse_page(self,html): data = json.loads(html) results = data.get('data') url_list = [] name_list = [] for result in results: cid = result['cid'] #获取视频地址 video_name = result['part'] #视频名称 url = f'https://api.bilibili.com/x/player/playurl?cid={cid}&otype=json&bvid={self.bvid}' res = requests.get(url,self.headers ) r =res.text re = json.loads(r).get('data')['durl'][0] url_list.append(re['url']) name_list.append(video_name) return url_list,name_list def download_video(self,url_list,name_list,i): data ={ 'referer': f'https://www.bilibili.com/video/{self.bvid}?p={i}', 'Connection': 'keep-alive', 'Origin': 'https://www.bilibili.com', 'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate, sdch, br', 'Accept-Language': 'zh-CN,zh;q=0.8' } self.headers.update(data) url = url_list[i-1] r = requests.get(url, headers=self.headers, stream=True) print(f'正在下载第{i}个视频,name:{name_list[i-1]}') with open(f'{name_list[i-1]}.mp4', "wb") as mp4: for chunk in r.iter_content(chunk_size=1024 * 1024): if chunk: mp4.write(chunk) print(f'第{i}个视频下载完成,name:{name_list[i-1]}') def run(self): html = self.get_page() url_list = self.parse_page(html) # self.download_video(url_list) bvid = 'BV1og4y1q7M4' a = Spider(bvid) html = a.get_page() url_list,name_list = a.parse_page(html) # We can use a with statement to ensure threads are cleaned up promptly,0 max_workers is ThreadNum with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(a.download_video, url_list, name_list,i): i for i in range(1,len(url_list)+1)} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('success')