from threading import Thread import os class InputData(object): def read(self): raise NotImplementedError class PathInputData(InputData): def __init__(self, path): super().__init__() self.path = path def read(self): return open(self.path, encoding='utf8').read() class Worker(object): def __init__(self, input_data): self.input_data = input_data self.result = 0 def map(self): raise NotImplementedError def reduce(self, other): raise NotImplementedError class LineCountWorker(Worker): def map(self): data = self.input_data.read() self.result = data.count('\n') def reduce(self, other): self.result += other.result def generate_inputs(data_dir): for name in os.listdir(data_dir): yield PathInputData(os.path.join(data_dir, name)) def create_workers(input_list): workers = [] print('input_list', input_list) for input_data in input_list: workers.append(LineCountWorker(input_data)) return workers def execute(workers): threads = [Thread(target=w.map) for w in workers] for thread in threads: thread.start() for thread in threads: thread.join() first, rest = workers[0], workers[1:] for worker in rest: first.reduce(worker) return first.result def mapreduce(data_dir): inputs = generate_inputs(data_dir) workers = create_workers(inputs) return execute(workers) path = "D:\\python3.64\\CorePython\\chapter-6" result = mapreduce(path) print('There are', result, 'lines')
输出:
input_list <generator object generate_inputs at 0x00000278722C7BF8> There are 83 lines
from threading import Thread import os class GenericInputData(object): def read(self): raise NotImplementedError @classmethod def generate_inputs(cls, config): raise NotImplementedError class PathInputData(GenericInputData): def __init__(self, path): super().__init__() self.path = path def read(self): return open(self.path, encoding='utf-8').read() @classmethod def generate_inputs(cls, config): data_dir = config['data_dir'] for name in os.listdir(data_dir): yield cls(os.path.join(data_dir, name)) class GenericWorker(object): def __init__(self, input_data): self.input_data = input_data self.result = 0 def map(self): raise NotImplementedError def reduce(self, other): raise NotImplementedError @classmethod def create_workers(cls, input_class, config): workers = [] for input_data in input_class.generate_inputs(config): workers.append(cls(input_data)) return workers class LineCountWorker(GenericWorker): def map(self): data = self.input_data.read() self.result = data.count('\n') def reduce(self, other): self.result += other.result def execute(workers): threads = [Thread(target=w.map) for w in workers] for thread in threads: thread.start() for thread in threads: thread.join() first, rest = workers[0], workers[1:] for worker in rest: first.reduce(worker) return first.result # 最后,重写mapreduce函数,令其变得完全通用. def mapreduce(worker_class, input_class, config): workers = worker_class.create_workers(input_class, config) return execute(workers) # 在测试文件上面执行修改后的MapReduce程序,所得结果与原来那套实现方式是相同的 # .区别在于,现在的mapreduce函数需要更多的参数,以便用更加通用的方式来梁作相关对象. path = "D:\\python3.64\\CorePython\\chapter-6" config = {'data_dir': path} result = mapreduce(LineCountWorker, PathInputData, config) print('There are', result, 'lines')
输出:
There are 83 lines