实验原理
“数据去重”主要是为了掌握和利用并行化思想来对数据进行有意义的筛选。统计大数据集上的数据种类个数、从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重。
数据去重的最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。在MapReduce流程中,map的输出<key,value>经过shuffle过程聚集成<key,value-list>后交给reduce。我们自然而然会想到将同一个数据的所有记录都交给一台reduce机器,无论这个数据出现多少次,只要在最终结果中输出一次就可以了。具体就是reduce的输入应该以数据作为key,而对value-list则没有要求(可以设置为空)。当reduce接收到一个<key,value-list>时就直接将输入的key复制到输出的key中,并将value设置成空值,然后输出<key,value>。
1.在Linux中开启Hadoop
start-all.sh
2.在Linux本地新建/data/mapreduce2目录。
mkdir -p /data/mapreduce2
3.从网上下载hadoop2lib,解压到mapreduce文件夹下
4.在HDFS上新建/mymapreduce2/in目录,然后将Linux本地/data/mapreduce2目录下的buyer_favorite1文件导入到HDFS的/mymapreduce2/in目录中。
hadoop fs -mkdir -p /mymapreduce2/in
hadoop fs -put /data/mapreduce2/buyer_favorite1 /mymapreduce2/in
5.在IDEA中编写代码
package mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.log4j.BasicConfigurator;
public class Filter{
public static class
Map extends Mapper<Object , Text , Text , NullWritable>{
private static Text newKey=new Text();
public void map(Object key,Text value,Context context)
throws IOException, InterruptedException{
String
line=value.toString();
System.out.println(line);
String
arr[]=line.split(",");
newKey.set(arr[1]);
context.write(newKey, NullWritable.get());
System.out.println(newKey);
}
}
public static class
Reduce extends Reducer<Text, NullWritable, Text, NullWritable>{
public void reduce(Text key,Iterable<NullWritable>
values,Context context) throws IOException, InterruptedException{
context.write(key,NullWritable.get());
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
System.setProperty("HADOOP_USER_NAME","root");
BasicConfigurator.configure(); //自动快速地使用缺省Log4j环境
Configuration
conf=new Configuration();
System.out.println("start");
Job job =new Job(conf,"filter");
job.setJarByClass(Filter.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path in=new Path("hdfs://192.168.149.10:9000/mymapreduce2/in/buyer_favorite1");
Path out=new Path("hdfs://192.168.149.10:9000/mymapreduce2/in/out");
FileInputFormat.addInputPath(job,in);
FileOutputFormat.setOutputPath(job,out);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
6.创建resources文件夹,其中创建log4j.properties文件
hadoop.root.logger=DEBUG, console log4j.rootLogger = DEBUG, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.out log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
7.导入hadoop2lib的包
8.运行结果
注意:如果有报错,配置一下这个 等号后跟你虚拟机中的用户名