有时候在进行数据处理时,处理的数据量非常庞大,在不使用并行化处理时可能需要若干小时。为了加快处理速度,使用多线程机制是非常常用的做法。本文简单实现了多线程的代码框架,可供数据处理使用。
多线程主要包括三个部分:
import multiprocessing import numpy as np from tqdm import tqdm class MultiProcess: def __init__(self, dataset=None): self.dataset = [[1, 1, 1]] * 100 def process(self, digits, fold="1by1"): # 处理函数:用于处理数据 data, para_id = digits print(para_id) num = 0 for i in tqdm(data): num += 1 # np.save('para_{}.npy'.format(para_id), data) # 对每个线程处理好的数据进行保存 def run(self): # 线程分配函数 n_cpu = multiprocessing.cpu_count() # 获得CPU核数 num = len(self.dataset) # 数据集样本数量 self.n_cpu = n_cpu print('cpu num: {}'.format(n_cpu)) chunk_size = int(num / n_cpu) # 分摊到每个CPU上的样本数量 procs = [] for i in range(0, n_cpu): min_i = chunk_size * i if i < n_cpu - 1: max_i = chunk_size * (i + 1) else: max_i = num digits = [self.dataset[min_i: max_i], i] # 每个线程唤醒并执行 procs.append(multiprocessing.Process(target=self.process, args=(digits, "parallel"))) for proc in procs: proc.start() for proc in procs: proc.join() def merge(self): # 数据合并函数:对每个线程上的处理好的数据进行合并 all_data = [] for i in range(self.n_cpu): data = np.load('para_{}.npy'.format(para_id), allow_pickle=True) all_data.append(data) return all_data if __name__ == '__main__': m = MultiProcess() m.run() # 多线程 # m.merge() # 对每个线程数据进行合并
测试机器为8核,测试效果如下:
读者可自行修改类中函数,实现更为复杂的功能。