----------------------------------主程序入口---------------------------------- package com.demo01.wordcount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class JobMain extends Configured implements Tool { /** *主程序入口 * @param args */ public static void main(String[] args) throws Exception { //这里执行完成,返回一个程序退出状态码 0成功 //这里设置configguration相当于给父类赋值了 int run = ToolRunner.run(new Configuration(),new JobMain(),args); System.exit(run); } /** * * run方法很重要,用来组装8个类,用Job组装在一起 * @param strings * @return * @throws Exception */ @Override public int run(String[] strings) throws Exception { //1.读取文件解析成value对 //第一个是configuration配置文件,第二个定义job的名字 Job job = Job.getInstance(super.getConf(),"XXX"); //设置程序入口类 job.setJarByClass(JobMain.class); //设置job接收的的数据类型 job.setInputFormatClass(TextInputFormat.class); //设置需要处理的文件 //hdfs集群下执行 // FileInputFormat.addInputPath(job,new Path("hdfs://node01:8020/wordcount")); //本地测试 FileInputFormat.addInputPath(job,new Path("file:///D:\\dsj\\baishi课件\\hadoop\\3、大数据离线第三天\\3、大数据离线第三天\\wordcount\\input")); //2.自定义mapper类 job.setMapperClass(WordCountMapper.class); //设置key2和value2的类 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); /** * 第三到六步: * 分区 相同key的value,放松到一个reduce,key合并,value形成一个集合 * 排序 * 规约 * 分组 */ //7.自定义reduce逻辑 job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //8.输出文件 //路径一定要不存在,存在就报错 // TextOutputFormat.setOutputPath(job,new Path("hdfs://node01/wordcountoutput")); //本地测试 TextOutputFormat.setOutputPath(job,new Path("file:///D:\\dsj\\baishi课件\\hadoop\\3、大数据离线第三天\\3、大数据离线第三天\\wordcount\\output")); //提交任务到集群上 boolean b = job.waitForCompletion(true); return b?0:1; } }
----------------------------------mapper程序----------------------------------
package com.demo01.wordcount; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; //此处泛型hadoop对java基础类型进行了包装,加快网络传输, 4个参数代表 public class WordCountMapper extends Mapper<LongWritable, Text,Text, LongWritable> { //重写map方法:自定义k1 v1转换到k2 v2的方法 /** * * @param key k1 * @param value v1 * @param context 上下文对象,对接我们上面的组件与下面的组件 * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //hive,sqoop,flume,hello String[] split = value.toString().split(","); //遍历k2和v2往下发送 for (String word : split) { Text k2 = new Text(word); LongWritable v2 = new LongWritable(1); context.write(k2,v2); } } }
----------------------------REDUCE程序--------------------------------------
package com.demo01.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; //k2,v2,k3,v3 public class WordCountReduce extends Reducer<Text, LongWritable, Text,LongWritable> { /** * * @param key k2 * @param values 一个集合,集合类型是v2的类型 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { int num = 0; for (LongWritable value : values) { //IntWritable这个类没有加方法,通过get()编程编程java类型 num += value.get(); } context.write(key,new LongWritable(num)); } }