废话不多说,直接贴代码,注释很详细,代码也可以直接用。
""" 此脚本只解决文件源下载慢的问题,并不会提升带宽。 """ import os import httpx from tqdm import tqdm from threading import Thread # 开辟线程数量,建议1-10个以内 THREAD_NUM = 5 tqdm_obj = None # 下载文件地址 download_url = "https://acvrpublicycchen.blob.core.windows.net/dialogpt/keys-full.tar" def download_file(file_path, thread_index, etag, start_index, stop_index): sub_path_file = "{}_{}".format(file_path, thread_index) if os.path.exists(sub_path_file): temp_size = os.path.getsize(sub_path_file) # 本地已经下载的文件大小 tqdm_obj.update(temp_size) # 更新下载进度条 else: temp_size = 0 if stop_index == '-': stop_index = "" headers = {'Range': 'bytes={}-{}'.format(start_index + temp_size, stop_index), 'ETag': etag, 'if-Range': etag, } with open(sub_path_file, 'ab') as down_file: with httpx.stream("GET", download_url, headers=headers) as response: num_bytes_downloaded = response.num_bytes_downloaded for chunk in response.iter_bytes(): if chunk: down_file.write(chunk) tqdm_obj.update(response.num_bytes_downloaded - num_bytes_downloaded) num_bytes_downloaded = response.num_bytes_downloaded return def get_file_size(): """ 获取预下载文件大小和文件etag :return: """ with httpx.stream("GET", download_url) as response2: etag = '' total_size = int(response2.headers["Content-Length"]) for tltle in response2.headers.raw: if tltle[0].decode() == "ETag": etag = tltle[1].decode() break return total_size, etag def cutting(file_size, thread_num): """ 切割成若干份 :param file_size: 下载文件大小 :param thread_num: 线程数量 :return: """ cut_info = {} cut_size = file_size // thread_num for num in range(1, thread_num + 1): if num != 1: cut_info[num] = [cut_size, cut_size * (num - 1) + 1, cut_size * num] else: cut_info[num] = [cut_size, cut_size * (num - 1), cut_size * num] if num == thread_num: cut_info[num][2] = '-' return cut_info def write_file(file_path, file_size): """ 合并分段下载的文件 :param file_path: :return: """ if os.path.exists(file_path): if len(file_path) >= file_size: return with open(file_path, 'ab') as f_count: for thread_index in range(1, THREAD_NUM + 1): with open("{}_{}".format(file_path, thread_index), 'rb') as sub_write: f_count.write(sub_write.read()) # 合并完成删除子文件 os.remove("{}_{}".format(file_path, thread_index)) return def create_thread(file_path, etag, cut_info): """ 开辟多线程下载 :param file_path: 文件存储路径 :param etag: headers校验 :param cut_info: :return: """ thread_list = [] for thread_index in range(1, THREAD_NUM + 1): thread_list.append(Thread(target=download_file, args=( file_path, thread_index, etag, cut_info[thread_index][1], cut_info[thread_index][2]))) for t in thread_list: t.setDaemon(True) t.start() for t in thread_list: t.join() return def main(): # 平分几份 global tqdm_obj file_size, etag = get_file_size() # 按线程数量均匀切割下载文件 cut_info = cutting(file_size, THREAD_NUM) # 下载文件名称 data_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'Data') if not os.path.exists(data_folder): os.mkdir(data_folder) file_path = os.path.join(data_folder, download_url.split('/')[-1]) # 创建下载进度条 tqdm_obj = tqdm(total=file_size, unit_scale=True, desc=file_path.split('/')[-1], unit_divisor=1024, unit="B", ) # 开始多线程下载 create_thread(file_path, etag, cut_info) # 合并多线程下载文件 write_file(file_path, file_size) return if __name__ == '__main__': main()