① FileInpuFormat读取数据
② Mapper阶段对数据简单处理
③ 序列化实现自定义排序
④ partition分区处理
⑤ Reducer写出数据
⑥ 主类设置
public class RankDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // 获取job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 加载主类 job.setJarByClass(RankDriver.class); // 设置Mapper和Reducer类 job.setMapperClass(RankMapper.class); job.setReducerClass(RankReducer.class); // 设置Mapper数据的数据类型 job.setMapOutputKeyClass(RankBean.class); job.setMapOutputValueClass(Text.class); // 设置最终数据的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(RankBean.class); // 设置Partition分区和分区个数 job.setPartitionerClass(RankPartitioner.class); job.setNumReduceTasks(6); // 文件输入输出路径 FileInputFormat.setInputPaths(job, new Path("E:\\test\\data\\*")); FileOutputFormat.setOutputPath(job, new Path("E:\\test\\RankTopKOut")); // 提交job boolean result = job.waitForCompletion(true); // 判断结束 System.exit(result ? 0 : 1); } }
① 实现WritableComparable接口,并传入比较对象,一般来说比较对象为自身。
② 设置空参构造器
③ 重写序列化方法(write and readFields)
④重写compareTo方法,方法体用于实现自定义排序
⑤ 重写toString方法,用于最终的数据写出。
import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class RankBean implements WritableComparable<RankBean> { private String module; // 学校类型 private double score; // 学校评分 private String position; // 学校位置 public RankBean() { } public String getModule() { return module; } public void setModule(String module) { this.module = module; } public double getScore() { return score; } public void setScore(double score) { this.score = score; } public String getPosition() { return position; } public void setPosition(String position) { this.position = position; } @Override public int compareTo(RankBean o) { if (this.score > o.score) { return -1; }else if (this.score < o.score) { return 1; }else { return 0; } } @Override public void write(DataOutput out) throws IOException { out.writeUTF(module); out.writeDouble(score); out.writeUTF(position); } @Override public void readFields(DataInput in) throws IOException { this.module = in.readUTF(); this.score = in.readDouble(); this.position = in.readUTF(); } @Override public String toString() { return module + "\t" + position + "\t" + score ; } }
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class RankMapper extends Mapper<LongWritable, Text, RankBean, Text> { private RankBean outK = new RankBean(); private Text outV = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split("\t"); // 分割获取对应数据 String name = split[0]; String position = split[1]; String mold = split[2]; String score = split[3]; // 存入数据 outV.set(name); outK.setModule(mold); outK.setPosition(position); outK.setScore(Double.parseDouble(score)); // 写出数据 context.write(outK,outV); } }
① 继承Partitioner类,泛型为Mapper的数据数据类型
② 重写getPartition方法,实现分区
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class RankPartitioner extends Partitioner<RankBean, Text> { @Override public int getPartition(RankBean rankBean, Text text, int numPartitions) { int partition; if ("北京".equals(rankBean.getPosition())) { partition = 0; }else if ("上海".equals(rankBean.getPosition())) { partition = 1; }else if ("天津".equals(rankBean.getPosition())) { partition = 2; }else if ("江苏".equals(rankBean.getPosition())) { partition = 3; }else if ("河南".equals(rankBean.getPosition())) { partition = 4; }else { partition = 5; } return partition; } }
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class RankReducer extends Reducer<RankBean, Text, Text, RankBean> { @Override protected void reduce(RankBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { context.write(value,key); } } }