1. 在python程序中,每个类只能有一个构造器,就是__init__方法。
2. 通过@classmethod机制,可以用一种与构造器相仿的方式来构造类的对象。
3. 通过类方法多态机制,可以更通用的方式构建子类
以上为核心构造一个MapReduce ,首先定义一个类读取数据:
class InputData():
"""基类"""
def read(self):
raise NotImplementedError
class PathInputData(InputData):
def __init__(self, path):
super().__init__()
self.path = path
def read(self):
return open(self.path).read()
下面实现一个工作类处理数据:
class Worker():
"""定义一个基类,以标准方式处理输入的数据"""
def __init__(self, input_data):
self.input_data = input_data
self.result = None
def map(self):
raise NotImplementedError
def reduce(self, other):
raise NotImplementedError
class LineCountWorker(Worker):
"""具体实现类, 定义一个换行符计数器"""
def map(self):
data = self.input_data.read()
self.result = data.count('\n')
def reduce(self, other):
self.result += other.result
接下来把两个类拼装起来:
import os
from thread import Thread
def generate_inputs(data_dir):
"""创建input实例"""
for name in os.listdir(data_dir):
yield PathInputData(os.path.join(data_dir, name))
def create_workers(input_list):
"""创建worker实例"""
workers = []
for input_data in input_list:
workers.append(LineCountWorker(input_data))
return workers
def execute(workers):
"""处理worker实例,把map加入到多个线程,再反复调用reduce"""
thread = [Thread(target=w.map) for w in workers]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
first, rest = workers[0], workers[1:]
for worker in rest:
first.reduce(worker) # 对结果聚合
return first.result
def mapreduce(data_dir):
inputs = generate_inputs(data_dir)
workers = create_workers(inputs)
return execute(workers)
if __name__=='__main__':
mapreduce(data_dir)
以上实现是没有问题的,但是不够通用。如果逻辑变了就要从头改写。
接下来引入@classmethod形式的多态,它针对的是整个类,而不是该类构建的某个对象。
首先从修改InputData类开始,使其变成通用的:
class GenericInputData():
"""input基类"""
def read(self):
raise NotImplementedError
@classmethod
def generate_inputs(cls, config):
raise NotImplementedError
class PathInputData(GenericInputData):
def __init__(self, path):
super().__init__()
self.path = path
def read(self):
return open(self.path).read()
@classmethod
def generate_inputs(cls, config):
data_dir = config['data_dir']
for name in os.listdir(data_dir):
yield cls(os.path.join(data_dir, name))
class GenericWorker():
"""worker基类"""
def __init__(self, input_data):
self.input_data = input_data
self.result = None
def map(self):
raise NotImplementedError
def reduce(self):
raise NotImplementedError
@classmethod
def create_worker(cls, input_class, config):
workers = []
for input_data in input_class.generate_inputs(config):
workers.append(cls(input_data))
return workers
class LineCountWorker(GenericWorker):
"""逻辑不变"""
def map(self):
data = self.input_data.read()
self.result = data.count('\n')
def reduce(self, other):
self.result += other.result
def execute(workers):
"""处理worker实例,把map加入到多个线程,再反复调用reduce"""
thread = [Thread(target=w.map) for w in workers]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
first, rest = workers[0], workers[1:]
for worker in rest:
first.reduce(worker) # 对结果聚合
return first.result
def mapreduce(worker_class, input_class, config):
"""worker类,input类,config中包含类数据的路径"""
workers = worker_class.create_workers(input_class, config)
return execute(workers)
if __name__ == '__main__':
mapreduce(LineCountWorker, PathInputData, config)
这样对不同的处理逻辑处理起来就很轻松了。