MapReduce是一个分布式运算程序的编程框架,使用户开发“基于Hadoop的数据分析应用”的核心框架。
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
用户只关心业务逻辑。实现框架的接口。
可以动态增加服务器,解决计算资源不够的问题。
任何一个节点宕机,可以将任务转移到其他节点
TB/PB
几千台服务器共同计算
sparkstreaming ,flink擅长
spark适合
<1>分为Map阶段和Reduce阶段
<2>Map阶段并发MapTask,完全并行运行,互不干扰
<3>Reduce阶段的并发ReduceTask,完全并行运行,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出
<4>MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那么只能多个MapReduce程序,串行运行
负责整个程序的过程调度以及状态协调
负责Map阶段的整个数据处理流程
负责Reduce阶段的整个数据处理流程
用户编写的程序分为三部分:Mapper、Reducer 和Driver
相当于YARN集群的客户端,用于提交我们整个程序到YRAN集群,提交的是封装了MapReduce程序相关运行参数的job对象。
<1>在给定的文本文件中统计输出每个单词出现的总次数
<2>输入数据: hello.txt
<3>期望输出数据: 统计结果
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.3</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.30</version> </dependency> </dependencies>
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
com.demo.mapreduce.wordcount
package com.demo.mapreduce.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * <KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN Map阶段输入的key的类型 :LongWritable * VALUEIN Map阶段输入的value的类型: Text * KEYOUT Map阶段输出的key的类型 : Text * VALUEOUT Map阶段输出的value的类型 : IntWritable */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private Text outK = new Text(); private IntWritable outV = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { //1. 获取一行数据 String line = value.toString(); //2. 对数据进行切割 String[] words = line.split(" "); //3. 循环写出 for (String word : words) { outK.set(word); context.write(outK, outV); } } }
package com.demo.mapreduce.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * <KEYIN,VALUEIN,KEYOUT,VALUEOUT> * KEYIN Reducer阶段输入的key的类型 :Text * VALUEIN Reducer阶段输入的value的类型: IntWritable * KEYOUT Reducer阶段输出的key的类型 : Text * VALUEOUT Reducer阶段输出的value的类型 : IntWritable */ public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable outV = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { //1. 累加 int sum = 0; for (IntWritable value : values) { sum += value.get(); } //2. 写出 outV.set(sum); context.write(key, outV); } }
package com.demo.mapreduce.wordcount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 1. 获取job * 2. 设置jar包路径 * 3. 关联mapper和reducer * 4. 设置map的输出的key 和 value 类型 * 5. 设置最终输出的key 和 value 的类型 * 6. 指定job的输入原始文件所在目录 * 7.指定job的输出结果所在目录(不能提前存在) * 8.提交作业 */ public class WordCountDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //1. 获取job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //2. 设置jar包路径 job.setJarByClass(WordCountDriver.class); //3. 关联mapper和reducer job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); //4. 设置map的输出的key 和 value 类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5. 设置最终输出的key 和 value 的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 6. 指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path("E:\\javaWordspaces\\MapReduceDemo\hello.txt")); //7.指定job的输出结果所在目录(不能提前存在) FileOutputFormat.setOutputPath(job, new Path("E:\\javaWordspaces\\MapReduceDemo\\result")); // 8.提交作业 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
<build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
HDFS 下创建 /wcinput/word.txt
hadoop jar MapReduceDemo-1.0-SNAPSHOT.jar com.demo.mapreduce.wordcount.WordCountDriver /wcinput/word.txt /wcoutput