// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package org.apache.hadoop.mapreduce; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; @Public @Stable public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Mapper() { } protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { context.write(key, value); } protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKeyValue()) { this.map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { this.cleanup(context); } } public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Context() { } } }
Mapper类主要有五种方法:
setup
(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context)map
(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context)cleanup
(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context)run
(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context)Context
implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { }
初始时调用,用来加载一些初始化的工作,像全局文件、建立数据库的链接等等,执行一次。
protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { }
结束时调用,收尾工作,如关闭文件、关闭数据库连接、执行map()后的键值分发等等,执行一次。
protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { context.write(key, value); }
它继承Mapper类,是一个需要重点重写的方法,将map工作的处理逻辑都要放在这个方法中。
map方法有三个参数,Mapper有四个泛型变量KEYIN, VALUEIN, KEIOUT, VALUEOUT
,它们分别代表输入的键值类型和输出的键值类型,context是上下文,用于保存Job的配置信息、状态和map处理结果,用于最后的write方法。
map中有重要的四个方法
context.nextKeyValue();
context.getCurrentKey();
context.getCurrentValue();
context.write(key,value);
public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKeyValue()) { this.map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { this.cleanup(context); } }
run方法中先执行setup
函数,然后是用map
处理数据,当处理完数据后用cleanup
的收尾工作。值得注意的是,setup函数和cleanup函数由系统作为回调函数只做一次,并不像map函数那样执行多次。
实现的是设计模式中的模板方法模式。Mapper类中定义了一些方法,用户可以继承这个类重写方法以应对不同的需求,但是这些方法内部的执行顺序是确定好的。它封装了程序的算法,让用户能集中精力处理每一部分的具体逻辑。
run方法在程序执行中会默认调用,从他的执行流程来看也给常符合我们的预期,先进行初始化,如果还有输入的数据,那么调用map方法处理每一个键值对,最终执行结束方法。
模板方法模式:
模板模式中分基本方法和模板方法,上述的run可看做基本方法,需要调用其它方法实现整体逻辑,不会被修改,其它几个方法可看做模板方法。基本方法一般会用final修饰,保证其不会被子类修改,而模板方法使用protect修饰,表示需要在子类中实现。
此外,模板模式中还有一个叫钩子方法的概念,即给子类一个授权,允许子类通过重写钩子方法来颠覆基本逻辑的执行,这有时候是非常有用的,可以用来完善具体的逻辑。
public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Context() { } }
Context类,它实现了MapContext接口,而MapContext继承于TaskInputOutputContext
参考资料:
hadoop之mapper类妙用
Hadoop学习笔记(六)实战wordcount
Java设计模式中的模板模式