把骨架(这个骨架就用通用算法进行抽象出来)定义好,具体实现交给子类去实现。
意思是说在模板里面只要把过程给定义好,具体怎么实现,这个模板方法是不关注的,具体的实现是又子类来完成的,可以有多个子类,每个子类实现的功能可以都不一样。
定义一个模板类:
package com.ruozedata.pattern.template; public abstract class Mapper { //setUp mapper clearUp三个方法都是抽象方法 //后面会用具体的子类来实现它 /** * 初始化的操作 : 打开冰箱门 */ abstract void setUp(); /** * 具体的业务逻辑 : 把大象、狗、猪等放进去 */ abstract void mapper(); /** * 资源释放的操作 : 关上冰箱门 */ abstract void clearUp(); /** * 定义我们的模板方法的执行流程 * 这个run方法,会调用前面的方法,定义执行顺序:初始化、执行、结束 */ public void run(){ setUp(); mapper(); clearUp(); } }
定义一个子类,来实现模板抽象类里的抽象方法:
package com.ruozedata.pattern.template; public class SubMapper extends Mapper{ void setUp() { System.out.println("SubMapper.setUp"); } void mapper() { System.out.println("SubMapper.mapper"); } void clearUp() { System.out.println("SubMapper.clearUp"); } }
再定义一个子类,来实现模板抽象类里的抽象方法,和上面的子类类似,但是可以实现不同的功能:
package com.ruozedata.pattern.template; public class SubMapper2 extends Mapper{ void setUp() { System.out.println("SubMapper2.setUp"); } void mapper() { System.out.println("SubMapper2.mapper"); } void clearUp() { System.out.println("SubMapper2.clearUp"); } }
再定义一个类,去调用上面的已经实现好的子类:
package com.ruozedata.pattern.template; public class Client { public static void main(String[] args) { SubMapper subMapper = new SubMapper(); subMapper.run(); SubMapper2 subMapper2 = new SubMapper2(); subMapper2.run(); } }
运行结果:
SubMapper.setUp SubMapper.mapper SubMapper.clearUp SubMapper2.setUp SubMapper2.mapper SubMapper2.clearUp
词频统计就是给你一个或者一批文件,让你统计每个单词出现的次数。
当拿到一个功能的时候,千万不要想着代码怎么写,而是要进行功能、需求的分析:用中文描述清楚这个事情是做什么的 1 2 3 4等步骤,把步骤写清楚。然后再是开发:把1234翻译成代码而已。上层的架构,包括上面每个步骤用什么技术框架去实现,这才是重要的。所以思路一定要理清楚。
现在分析这个:使用HDFS API来完成词频(wordcount wc)统计
另外大数据里的东西就是三段论:
1.输入
2.处理
3.输出
所有的都是上面的流程。
对上面进行功能拆解:
第一步:输入:要去使用HDFS 的 API去读取文件;
第二步:处理:词频
1.读文件进来的内容是一行一行的,按照某个指定的分隔符进行行内容的拆分,就变成一堆单词了
2.给每个单词赋上出现的次数为1,如下:
比如这一行单词: wc,word,hello,word ,按照逗号分割,每个单词赋上出现的次数为1
(wc,1)
(word,1)
(hello,1)
(word,1)
上面每个单词出现的次数都是1,我们要的是每个单词出现的总次数,那么怎么给它们累加起来?
3.把上面的分割后的单词放到一个缓存中,如放到map中,map<单词,次数>,当word出现一次的时候是map<word,1>,当出现两次的时候就是map<word,2>,这个map就是缓存。
4.把这个map的缓存中的内容遍历处理 这个就是 词频。
第三步:输出 可以按照你想输出的地方进行输出
1.打印到本地
2.写到本地文件系统
3.写到HDFS文件系统
4.写到MySQL数据库…
上面的骨架已经定义好,下面来进行实现。
1.首先定义一个抽象类或者接口Mapper,只是定义了功能,但并不关注具体怎么实现。
package com.ruozedata.hadoop.hdfs; public interface Mapper { /** * map 一一操作 对每个元素进行操作 * 现在读进来是一行数据,对读进来的每行数据进行操作 */ public void map(String line,Context context); }
这个接口,只是定义了一个map方法,它的功能是传进来一行数据line,中间处理过程数据以及结果数据会放在context缓存中,所以可以理解line是一行数据,context是一个缓存,临时存放数据的一个东西。
2.定义这个缓存Context
这个缓存Context ,它有一个cacheMap 对象,这个对象是一个HashMap实例,有两个参数,第一个是key,第二个是value,可以理解为可以存放<key,value>的数据,就是缓存。
代码如下:
package com.ruozedata.hadoop.hdfs; import java.util.HashMap; import java.util.Map; public class Context { private Map<Object,Object> cacheMap = new HashMap<Object, Object>(); //get方法 public Map<Object, Object> getCacheMap() { return cacheMap; } /** * set方法 * 把数据写入到缓存中 * @param key 单词 * @param value 次数 */ public void write(Object key, Object value) { cacheMap.put(key, value); } /** * 从缓存中获取单词对应的次数 * @param key 单词 * @return 次数 */ public Object get(Object key) { return cacheMap.get(key); } }
3.定义一个类WordCountMapper ,来实现上面的接口Mapper,具体怎么实现是由WordCountMapper来完成。
传进去一行数据和一个缓存,把这行数据按照空格进行分割,分割后是一个数组,然后对这个数组进行遍历,然后根据key的值去缓存中看看有没有对应的value,如果没有,则把这个单词,也就是key放进缓存中,并且value值给1。如果有这个单词key,那么把对应的value取出来再加1,然后再放进缓存中去。
package com.ruozedata.hadoop.hdfs; public class WordCountMapper implements Mapper { public void map(String line, Context context) { String[] splits = line.split(" "); for (String word : splits) { Object value = context.get(word); if (null == value){ //单词不存在的情况 context.write(word,1); } else { //单词存在的情况 先把读取出来的值加1,去然后再写进去 context.write(word,Integer.parseInt(value.toString()) + 1); } } } }
4.定义一个类HDFSWCAPI01,读取文件,处理数据,然后输出。
定义一个configuration,然后给它配置相关的hdfs地址等,然后定义一个fileSystem,有了configuration,就有了fileSystem入口。
package com.ruozedata.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import javax.swing.plaf.synth.ColorType; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.Map; public class HDFSWCAPI01 { public static void main(String[] args) throws Exception{ //Configuration和FileSystem Configuration configuration = new Configuration(); configuration.set("fs.defaultFS","hdfs://hadoop001:9000"); configuration.set("dfs.replication","1"); System.setProperty("HADOOP_USER_NAME","ruoze"); FileSystem fileSystem = FileSystem.get(configuration); //读取数据 input Path input = new Path("/hdfsapi/test3/"); WordCountMapper mapper = new WordCountMapper(); Context context = new Context(); //远程迭代,路径可能是一个文件或者文件夹,文件夹下面可能有多个文件包括子文件夹下面的文件 RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(input, true); //如果有多个文件,则iterator.next为一个文件,每次循环一个文件,但是context缓存中会一直累加 while (iterator.hasNext()){ LocatedFileStatus status = iterator.next(); FSDataInputStream in = fileSystem.open(status.getPath()); BufferedReader read = new BufferedReader(new InputStreamReader(in)); String line = ""; while ((line = read.readLine()) != null){ System.out.println(line); mapper.map(line,context); } read.close(); in.close(); //从context缓存中获取数据,把cacheMap中的<key,value>循环读取完。 Map<Object, Object> cacheMap = context.getCacheMap(); for (Map.Entry<Object, Object> entry : cacheMap.entrySet()) { System.out.println(entry.getKey() + "\t" + entry.getValue()); } } } }
如果忽略单词大小写去统计wc,只需要把上面的WordCountMapper的复制一份,CaseIgnoreWordCountMapper只需要加上line.toLowerCase()。
package com.ruozedata.hadoop.hdfs; public class CaseIgnoreWordCountMapper implements Mapper { public void map(String line, Context context) { String[] splits = line.toLowerCase().split(" "); for (String word : splits) { Object value = context.get(word); if (null == value){ //单词不存在的情况 context.write(word,1); } else { //单词存在的情况 先把读取出来的值加1,去然后再写进去 context.write(word,Integer.parseInt(value.toString()) + 1); } } } }
然后加入了Mapper mapper = new CaseIgnoreWordCountMapper();进行调用,会忽略大小写,这个也是多态的使用。
package com.ruozedata.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.Map; public class HDFSWCAPI01 { public static void main(String[] args) throws Exception{ //Configuration和FileSystem Configuration configuration = new Configuration(); configuration.set("fs.defaultFS","hdfs://hadoop001:9000"); configuration.set("dfs.replication","1"); System.setProperty("HADOOP_USER_NAME","ruoze"); FileSystem fileSystem = FileSystem.get(configuration); //读取数据 input Path input = new Path("/hdfsapi/test3/"); //WordCountMapper mapper = new WordCountMapper(); --这个不会忽略大小写 //下面这个加了Mapper,为多态的使用,如果把Mapper换成CaseIgnoreWordCountMapper,则不是 //CaseIgnoreWordCountMapper这个会忽略大小写 Mapper mapper = new CaseIgnoreWordCountMapper(); Context context = new Context(); //远程迭代,路径可能是一个文件或者文件夹,文件夹下面可能有多个文件包括子文件夹下面的文件 RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(input, true); //如果有多个文件,则iterator.next为一个文件,每次循环一个文件,但是context缓存中会一直累加 while (iterator.hasNext()){ LocatedFileStatus status = iterator.next(); FSDataInputStream in = fileSystem.open(status.getPath()); BufferedReader read = new BufferedReader(new InputStreamReader(in)); String line = ""; while ((line = read.readLine()) != null){ System.out.println(line); mapper.map(line,context); } read.close(); in.close(); System.out.println("\n\n"); //TODO... 后面可以考虑把结果写入到hdfs的某个文件中去 //Path result = new Path("/hdfsapi/result/result.txt"); //从context缓存中获取数据,把cacheMap中的<key,value>循环读取完。 Map<Object, Object> cacheMap = context.getCacheMap(); for (Map.Entry<Object, Object> entry : cacheMap.entrySet()) { System.out.println(entry.getKey() + "\t" + entry.getValue()); } } } }
上面的写法还不够灵活,可以通过配置文件的形式进行改造,把需要的东西放在配置文件中,需要什么读什么,读进来之后通过反射的方式进行处理。
1.在resources中创建一个文件:wc.properties
INPUT_PATH=/hdfsapi/test3/ OUTPUT_PATH=/hdfsapi/result/ HDFS_URI=hdfs://hadoop001:9000 ##具体用哪个子类来实现,写这个类,底层要用反射才能实现 MAPPER_CLASS=com.ruozedata.hadoop.hdfs.WordCountMapper
2.创建一个工具类ParamsUtils去读取上面的配置文件
package com.ruozedata.hadoop.hdfs; import java.io.IOException; import java.util.Properties; public class ParamsUtils { private static Properties properties = new Properties(); static { try { properties.load(ParamsUtils.class.getClassLoader().getResourceAsStream("wc.properties")); } catch (IOException e) { e.printStackTrace(); } } //get方法 public static Properties getProperties() { return properties; } public static void main(String[] args) { System.out.println(getProperties().getProperty("MAPPER_CLASS")); System.out.println(getProperties().getProperty("INPUT_PATH")); } }
上面这个getProperties().getProperty(“MAPPER_CLASS”)都是写死的,可以用一个常量类来封装一下,不封装也可以。
package com.ruozedata.hadoop.hdfs; public class Constants { public static final String INPUT_PATH = "INPUT_PATH"; public static final String OUTPUT_PATH = "OUTPUT_PATH"; public static final String HDFS_URI = "HDFS_URI"; public static final String MAPPER_CLASS = "MAPPER_CLASS"; }
然后再调用常量类:
package com.ruozedata.hadoop.hdfs; import java.io.IOException; import java.util.Properties; public class ParamsUtils { private static Properties properties = new Properties(); static { try { properties.load(ParamsUtils.class.getClassLoader().getResourceAsStream("wc.properties")); } catch (IOException e) { e.printStackTrace(); } } //get方法 public static Properties getProperties() { return properties; } public static void main(String[] args) { // System.out.println(getProperties().getProperty("MAPPER_CLASS")); // System.out.println(getProperties().getProperty("INPUT_PATH")); System.out.println(getProperties().getProperty(Constants.MAPPER_CLASS)); System.out.println(getProperties().getProperty(Constants.INPUT_PATH)); System.out.println(getProperties().getProperty(Constants.HDFS_URI)); System.out.println(getProperties().getProperty(Constants.OUTPUT_PATH)); } }
输出结果:
com.ruozedata.hadoop.hdfs.WordCountMapper /hdfsapi/test3/ hdfs://hadoop001:9000 /hdfsapi/result/
最后再进行测试,HDFSWCAPI02 再HDFSWCAPI01基础上只是修改了通过properties 拿到input,以及通过反射拿到MAPPER_CLASS类,其他都不需要修改。
package com.ruozedata.hadoop.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.Map; import java.util.Properties; public class HDFSWCAPI02 { public static void main(String[] args) throws Exception{ //拿到配置 Properties properties = ParamsUtils.getProperties(); Path input = new Path(properties.getProperty(Constants.INPUT_PATH)); //Configuration和FileSystem Configuration configuration = new Configuration(); configuration.set("fs.defaultFS","hdfs://hadoop001:9000"); configuration.set("dfs.replication","1"); System.setProperty("HADOOP_USER_NAME","ruoze"); FileSystem fileSystem = FileSystem.get(configuration); //因为上面有了input,所以这里不需要了 // //读取数据 input // Path input = new Path("/hdfsapi/test3/"); //因为配置文件中有MAPPER_CLASS,在下面会用到,所以在这里需要通过反射把它给拿出来 Class<?> aClass = Class.forName(properties.getProperty(Constants.MAPPER_CLASS)); //然后通过aClass反射,去new一个instance:aClass.newInstance() //因前面Class<?> 的类型是不明确的,所以在这里 // 需要在aClass.newInstance()前面加一个强制转换(Mapper)成Mapper类型,就是转成它的父类 Mapper mapper = (Mapper) aClass.newInstance(); Context context = new Context(); //远程迭代,路径可能是一个文件或者文件夹,文件夹下面可能有多个文件包括子文件夹下面的文件 RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(input, true); //如果有多个文件,则iterator.next为一个文件,每次循环一个文件,但是context缓存中会一直累加 while (iterator.hasNext()){ LocatedFileStatus status = iterator.next(); FSDataInputStream in = fileSystem.open(status.getPath()); BufferedReader read = new BufferedReader(new InputStreamReader(in)); String line = ""; while ((line = read.readLine()) != null){ System.out.println(line); mapper.map(line,context); } read.close(); in.close(); System.out.println("\n\n"); //TODO... 后面可以考虑把结果写入到hdfs的某个文件中去 //Path result = new Path("/hdfsapi/result/result.txt"); //从context缓存中获取数据,把cacheMap中的<key,value>循环读取完。 Map<Object, Object> cacheMap = context.getCacheMap(); for (Map.Entry<Object, Object> entry : cacheMap.entrySet()) { System.out.println(entry.getKey() + "\t" + entry.getValue()); } } } }
这样的话所有的输入的东西都在配置文件中完成,包括用哪个类也指定好了,上面统计是区分大小写的,如果要调用区分大小写的类,只需要在wc.properties中把:
MAPPER_CLASS=com.ruozedata.hadoop.hdfs.WordCountMapper
修改成即可:
MAPPER_CLASS=com.ruozedata.hadoop.hdfs.CaseIgnoreWordCountMapper