知识梳理不易,请尊重劳动成果,文章仅发布在CSDN网站上,在其他网站看到该博文均属于未经作者授权的恶意爬取信息
相当于实现SQL里面的distinct的功能。废话不说多,直接进行代码编程,创建一个demo.distinct的package,然后进行框架的搭建(框架里面包含Mapper、Reducer和执行的主程序三个文件),如下
首先开发Mapper程序,具体的代码如下(基本上之前都见过了,没啥区别)
package demo.distinct; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; // k1 v1 k2:job v2:null public class DistinctMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // 数据:7369,SMITH,CLERK,7902,1980/12/17,800,0,20 String data = v1.toString(); //分词 String[] words = data.split(","); //输出 context.write(new Text(words[2]), NullWritable.get()); } }
其次就是开发Reduce程序,只需要指定一下数据类型然后写入数据,不需要进行其它的操作
package demo.distinct; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class DistinctReducer extends Reducer<Text, NullWritable, Text, NullWritable> { @Override protected void reduce(Text k3, Iterable<NullWritable> v3,Context context) throws IOException, InterruptedException { context.write(k3, NullWritable.get()); } }
接着就是执行的主程序,将原来的程序直接拿过来进行改写(还是修改之前圈出来的三个红框部分,分别对应下面的(1)(2)(3)中的内容)
package demo.distinct; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class DistinctMain { public static void main(String[] args) throws Exception { // (1)创建任务Job,并且制定任务的入口 Job job = Job.getInstance(new Configuration()); job.setJarByClass(DistinctMain.class); //指定为当前程序 //(2)指定任务的Map,Map的输出类型 job.setMapperClass(DistinctMapper.class); job.setMapOutputKeyClass(Text.class);//k2 job.setMapOutputValueClass(NullWritable.class);//v2 //(3)指定任务的Reduce,Reduce的输出类型 job.setReducerClass(DistinctReducer.class); job.setOutputKeyClass(Text.class);//k4 job.setOutputValueClass(NullWritable.class);//v4 //(4)指定任务的输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //(5)执行任务 job.waitForCompletion(true); //表示执行的时候打印日志 } }
程序开发完成后打包为p12.jar,上传到hadoop上运行
核实生成的文件数据信息是否去除了重复的信息(核实无误,啦啦啦~)
补充一些关于SQL中的表连接的知识:
关于笛卡尔积,比如部门号有2条记录,然后员工表有4条记录,如果两表进行笛卡尔积,最后就是2x4=8条记录,如下
但是可以发现笛卡尔积的结果有些内容根本就是错的,不是我们所需要的,因此我们真正需要的是要有一定依据的连接,比如等值连接和自连接,还有外连接等
查询员工信息,显示:员工号、姓名,薪水,部门名称(下面是Sql语句实现)
select e.ename,d.dname from emp e,dept d where e.deptno=d.deptno;
在MapReduce中实现,首先要分析一些等值连接的一个过程,理清楚里面每一步的数据类型和步骤,写起来就很方便了。MapReduce:分析等值连接数据处理的流程
编程实现等值连接,创建一个名为demo.equal的package,然后搭建框架,还是三个文件(Mapper程序、Reducer程序和运行主程序)
首先开发Mapper程序,就是先按照之前分析的流程中指定一下数据类型,接着解决上面问题,就是判断是员工表数据还是部门表数据,然后对于部门表中的Text数据进行*号标记,用于区别部门和员工名称
package demo.equaljoin; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; // k1 v1 k2 部门号 v2 public class EqualJoinMapper extends Mapper<LongWritable, Text, IntWritable, Text> { @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { //部门表:10,,ACCOUNTING,NEW YORK //员工表:7369,SMITH,CLERK,7902,1980/12/17,800,0,20 String data = v1.toString(); //分词 String[] words = data.split(","); //判断 if(words.length == 3) { //部门表:部门号和部门名称 context.write(new IntWritable(Integer.parseInt(words[0])), new Text("*"+words[1])); }else { //员工表:部门号,员工姓名 context.write(new IntWritable(Integer.parseInt(words[7])), new Text(words[1])); } } }
Reducer程序开发的代码稍微有点复杂,就是需要设置两个空的字符,然后对Mapper输出的数据,进行遍历,然后根据是否有*号进行分类,重新写到部门号和员工姓名字段中
package demo.equaljoin; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class EqualJoinReducer extends Reducer<IntWritable, Text, Text, Text> { @Override protected void reduce(IntWritable k3, Iterable<Text> v3, Context context) throws IOException, InterruptedException { // 定义变量,分别保存部门名称和员工姓名 String dname = ""; String empNameList = ""; for(Text v:v3) { String str = v.toString(); //判断是否包含*号 int index = str.indexOf("*"); if(index >= 0) { //是部门名称 dname = str.substring(1); }else { //是员工姓名 empNameList = str + ";" + empNameList; } } //输出 context.write(new Text(dname), new Text(empNameList)); } }
接着就是运行的主程序,还是修改(1)(2)(3)中的内容,其余的保持不变
package demo.equaljoin; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class EqualJoinMain { public static void main(String[] args) throws Exception { // (1)创建任务Job,并且制定任务的入口 Job job = Job.getInstance(new Configuration()); job.setJarByClass(EqualJoinMain.class); //指定为当前程序 //(2)指定任务的Map,Map的输出类型 job.setMapperClass(EqualJoinMapper.class); job.setMapOutputKeyClass(IntWritable.class);//k2 job.setMapOutputValueClass(Text.class);//v2 //(3)指定任务的Reduce,Reduce的输出类型 job.setReducerClass(EqualJoinReducer.class); job.setOutputKeyClass(Text.class);//k4 job.setOutputValueClass(Text.class);//v4 //(4)指定任务的输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //(5)执行任务 job.waitForCompletion(true); //表示执行的时候打印日志 } }
程序开发完成后,打包为p13.jar文件,上传至hadoop上运行
核实生成文件中的数据信息(左侧为部门表,右侧为员工表,成功实现了)
通过表的别名,将同一张表视为多张表,查询员工信息,显示:老板名称、员工姓名(Sql语句查询如下)
select b.ename,e.ename from emp e,emp b where e.mgr=b.empno;
在MapReduce中实现自连接,首先梳理一下这个过程,理清数据类型和步骤,图示如下
编程实现等值连接,创建一个名为demo.selfjoin的package,然后搭建框架,还是三个文件(Mapper程序、Reducer程序和运行主程序)
首先开发Mapper程序,就是先按照之前分析的流程中指定一下数据类型,对于数据的写入要进行两次,表格同时作为员工表和老板表,这里还有有防错的处理,因为数据中存在一个大boss,他上面是没有老板的,这个数据是空,所以如果遇到这条数据,就把他的老板标记为-1,这样就识别除了大boss
package demo.selfjoin; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class SelfJoinMapper extends Mapper<LongWritable, Text, IntWritable, Text> { @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // 员工表:7369,SMITH,CLERK,7902,1980/12/17,800,0,20 String data = v1.toString(); //分词 String[] words = data.split(","); //输出 try { //作为老板表:员工号 姓名 context.write(new IntWritable(Integer.parseInt(words[0])), new Text("*"+words[1])); //作为员工表: 老板号 姓名 context.write(new IntWritable(Integer.parseInt(words[3])), new Text(words[1])); }catch (Exception e) { // 表示大老板 context.write(new IntWritable(-1), new Text(words[1])); } } }
Reducer程序开发的代码这里比Mapper程序相较简单一点了,对比一下发现和前面的等值连接中的代码几乎一模一样,只是变量之间存在着差异
package demo.selfjoin; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class SelfJoinReducer extends Reducer<IntWritable, Text, Text, Text> { @Override protected void reduce(IntWritable k3, Iterable<Text> v3, Context context) throws IOException, InterruptedException { //定义两个变量:老板姓名 员工姓名 String bossName = ""; String empNameList = ""; for(Text v:v3) { String str = v.toString(); //判断是否有*号 int index = str.indexOf("*"); if(index >=0) { //表示老板姓名 bossName = str.substring(1); }else { empNameList = str + ";" + empNameList; } } context.write(new Text(bossName), new Text(empNameList)); } }
运行主程序的代码设计,将(1)(2)(3)中的类名称修改一下即可
package demo.selfjoin; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SelfJoinMain { public static void main(String[] args) throws Exception { // (1)创建任务Job,并且制定任务的入口 Job job = Job.getInstance(new Configuration()); job.setJarByClass(SelfJoinMain.class); //指定为当前程序 //(2)指定任务的Map,Map的输出类型 job.setMapperClass(SelfJoinMapper.class); job.setMapOutputKeyClass(IntWritable.class);//k2 job.setMapOutputValueClass(Text.class);//v2 //(3)指定任务的Reduce,Reduce的输出类型 job.setReducerClass(SelfJoinReducer.class); job.setOutputKeyClass(Text.class);//k4 job.setOutputValueClass(Text.class);//v4 //(4)指定任务的输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //(5)执行任务 job.waitForCompletion(true); //表示执行的时候打印日志 } }
程序设计完成后打包生成p14.jar文件,上传hadoop上进行
查看一下输出的文件中是够有什么问题不。直接输出的结果中并不是我们想要的结果,主要是因为,公司的人员架构中大boss上面没有人了,最底层的员工下面也没有人了,所以就造成输出的样式
对代码进行改进,判断如果存在老板和员工才进行输出(在Reducer程序中修改)
重新打包一下生成p15.jar文件,然后上传hadoop上运行
接下来就是见证奇迹的时候了,啦啦啦~(很完美,和想象中的输出一毛一样)
之前在介绍WordCount计数的时候就已经介绍过倒排索引的过程,如下
接下来就是用编程的方式自己写代码实现一下倒排索引。准备测试数据,在temp文件夹下创建三个文件,内容分别如下
vi data01.txt I love Beijing and love Shanghai vi data02.txt I love China vi data03.txt Beijing is the capital of China
检验创建的数据,核实无误
然后将创建的数据上传到hdfs上,代码指令:hdfs dfs -put data0*.txt /indexdata
那么就是用MapReduce实现倒排索引,首先要分析一下这个过程的数据类型和步骤,如下
先开始设计Mapper程序,里面的关键就是获取文件名称,然后在进行字符串的切割求解得到(注意仿照预设定的格式进行获取/存取数据)
package demo.revertedindex; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileSplit; /这个包别导错了 import org.apache.hadoop.mapreduce.Mapper; public class RevertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // 数据:data01.txt I love Beijing and love Shanghai //获取输入数据的路径: /indexdata/data01.txt String path = ((FileSplit)context.getInputSplit()).getPath().toString(); //查询最后一个斜线 int index = path.indexOf("/"); //得到文件名 String fileName = path.substring(index+1); String data = v1.toString(); //分词 String[] words = data.split(","); //输出 for(String w:words) { context.write(new Text(w+":"+fileName), new Text("1")); } } }
接着处理Combiner程序,跟着分析的流程一步步进行就可以了
package demo.revertedindex; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class RevertedIndexCombiner extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text k21, Iterable<Text> v21, Context context) throws IOException, InterruptedException { // 对v21求和,得到某个单词在某个文件中频率 int total = 0; for(Text v:v21) { total = total + Integer.parseInt(v.toString()); } //k21的数据是:love:data01.txt String data = k21.toString(); int index = data.indexOf(":"); String word = data.substring(0,index); String fileName = data.substring(index+1); // love data01.txt:2 context.write(new Text(word), new Text(fileName+":"+total)); } }
然后就是处理Reducer程序中的内容,对于Combiner传输的数据,进行遍历循坏,依次转化为目标的格式,最后再输出
package demo.revertedindex; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class RecertedIndexReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text k3, Iterable<Text> v3, Context context) throws IOException, InterruptedException { //对combiner的输出结果进行拼加 String str = ""; for(Text v:v3) { str = "(" + v.toString()+")" + str; } context.write(k3, new Text(str)); } }
最后就是编写运行主程序的代码,把之前的代码拿过来修改一下即可(还需要添加中间的Combiner)
package demo.revertedindex; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class RevertedIndexMain { public static void main(String[] args) throws Exception { // (1)创建任务Job,并且制定任务的入口 Job job = Job.getInstance(new Configuration()); job.setJarByClass(RevertedIndexMain.class); //指定为当前程序 //(2)指定任务的Map,Map的输出类型 job.setMapperClass(RevertedIndexMapper.class); job.setMapOutputKeyClass(Text.class);//k2 job.setMapOutputValueClass(Text.class);//v2 //引入Combiner job.setCombinerClass(RevertedIndexCombiner.class); //(3)指定任务的Reduce,Reduce的输出类型 job.setReducerClass(RevertedIndexReducer.class); job.setOutputKeyClass(Text.class);//k4 job.setOutputValueClass(Text.class);//v4 //(4)指定任务的输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //(5)执行任务 job.waitForCompletion(true); //表示执行的时候打印日志 } }
程序全部开发完成后,打包为p16.jar,上传至hadoop上运行(这次是在map阶段有了三步)
核实一下生成的文件中的信息(最后的内容输出格式上貌似和原来设想的很不一样,因此肯定是中间的数据处理出问题了)
接着就是找问题,发现在Mapper程序中有两行代码大意了,习惯性的敲出来了(一个是最后的索引少了last,第二个就是分割的时候这次是空格不是逗号)
修改完毕后重新打包上传运行结果如下(很完美,这次和预期一样,啦啦啦~)
之前运行MapReduce程序都是程序开发完成之后,通过WinSCP软件将本地的jar包上传至hadoop上,然后再进行操作,但是这种方式并不是很方便,我们希望可以直接就在开发工具中运行测试(可以直接下载hadoop插件,不过这里还有一个MRUNIT框架就可以实现对MapReduce进行测试)
在工程项目中创建一个新的文件夹,命名为mrunit,将提供的资料中的相关jar都放置在该文件夹下,并添加环境,这样MRUNIT环境就配置完成了,接下来就可以直接进行测试
特别注意一下,添加环境的jar包有一个是和别的jar包有冲突,需要进行剔除(mockito-all-1.8.5.jar)
接着就是见证奇迹的时候啦,以之前写过的WordCount程序为例,测试一下能不能直接调试运行,创建一个新的package命名为demo.mrunit,然后将wc中的Mapper和Reducer程序直接拷贝过来,分别进行测试
创建一个新的Java Class命名为MRUnitWordCount,设置好测试的框架,就是Mapper程序,Reducer程序和Job运行程序
首先处理Mapper的测试,注意导入的MapDriver是在org.apache.hadoop.mrunit.mapreduce下
@Test public void testMapper() throws Exception{ //创建一个WordCountMapper的测试对象 WordCountMapper mapper = new WordCountMapper(); //创建一个Driver进行单元测试 MapDriver<LongWritable,Text, Text, IntWritable> driver = new MapDriver(mapper); //指定Map输入的数据 driver.withInput(new LongWritable(1),new Text("I love Beijing")); //指定Map的输出 driver.withOutput(new Text("I"),new IntWritable(1)) .withOutput(new Text("love"),new IntWritable(1)) .withOutput(new Text("Beijing"),new IntWritable(1)); //执行单元测试,对比:我们希望得到的结果和实际运行的结果 driver.runTest(); }
点击鼠标右键进行运行,结果显示为绿色,说明实际输出和我们期望输出一致
不妨将上面的红框的内容进行修改一下,比如love单词的次数修改为2,然后再次运行,查看一下运行结果(左侧的状态条为红色,输出报错中有提醒,最终的love单词出现的次数为1,不是期望的2)
Mappper程序测试成功,接着就是测试一下Reduce程序
@Test public void testReducer() throws Exception{ WordCountReducer reducer = new WordCountReducer(); ReduceDriver<Text, IntWritable, Text, IntWritable> driver = new ReduceDriver<Text, IntWritable, Text, IntWritable>(reducer); //构造Reducer输入 List ArrayList<IntWritable> value3 = new ArrayList<IntWritable>(); value3.add(new IntWritable(1)); value3.add(new IntWritable(1)); value3.add(new IntWritable(1)); driver.withInput(new Text("Beijing"),value3); //指定Reducer的输出,是我们希望得到的结果 driver.withOutput(new Text("Beijing"),new IntWritable(3)); driver.runTest();
输出的结果为:(运行状态条为绿色,测试通过)
如果将期望的输出结果调成4,运行的结果如下(证明程序可以来测试啦)
最后就是来测试运行的Job,代码如下
@Test public void testJob() throws Exception{ //创建对象 WordCountMapper mapper = new WordCountMapper(); WordCountReducer reducer = new WordCountReducer(); //创建Driver MapReduceDriver<LongWritable,Text, Text, IntWritable,Text, IntWritable> driver = new MapReduceDriver(mapper,reducer); //指定Mapper输入的数据 driver.withInput(new LongWritable(1),new Text("I love Beijing")) .withInput(new LongWritable(2),new Text("I love China")) .withInput(new LongWritable(3),new Text("Beijing is the capital of China")); //指定Reducer的输出 driver.withOutput(new Text("I"),new IntWritable(2)) .withOutput(new Text("love"),new IntWritable(2)) .withOutput(new Text("Beijing"),new IntWritable(2)) .withOutput(new Text("China"),new IntWritable(2)) .withOutput(new Text("is"),new IntWritable(1)) .withOutput(new Text("the"),new IntWritable(1)) .withOutput(new Text("capital"),new IntWritable(1)) .withOutput(new Text("of"),new IntWritable(1)); driver.runTest(); }
输出的结果为:(可以发现最终的计数是正常的,但是顺序不对)
因为MapReduce会有一个默认的排序规则,我们调整一下最后的输出的内容,然后再运行,可以发现按照字典的顺序进行输出后,状态条显示绿色,测试正确
至此,关于数据去重、多表查询、倒排索引、单元测试等案例编程就梳理完了。MapReduce的知识点也就全部完结了,下一个博客就是进行之前的内容的回顾,复习一下,查漏补缺。撒花✿✿ヽ(°▽°)ノ✿