import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class Weather13Mapper extends Mapper<LongWritable, Text,Text, DoubleWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //super.map(key, value, context); String line=value.toString(); //[2018年01月01日 阴/小雨 11℃/8℃ 东南风<3级/北风<3级] String[] arr=line.split(" "); FileSplit inputSplit = (FileSplit) context.getInputSplit(); String name = inputSplit.getPath().getName (); // String t1=name.substring(0,4); String t2=name.substring(5,7); // String time=t1+"/"+t2; // System.out.println(t1+"/"+t2); if (arr.length>2){ // String weather=arr[2]; // System.out.println(weather); String w=arr[2].replace("℃",""); // System.out.println(w); String[] w1=w.split("/"); double weather=(Double.parseDouble(w1[0])+Double.parseDouble(w1[w1.length-1]))/2; context.write(new Text(t2),new DoubleWritable(weather)); } } }
import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class Weather13Reducer extends Reducer<Text, DoubleWritable,Text,DoubleWritable> { @Override protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { //super.reduce(key, values, context); double sum=0; double sc=0; // double avg_weather=0; for (DoubleWritable i:values){ sum+=i.get(); sc++; } // avg_weather=sum/sc; context.write(key,new DoubleWritable(Double.parseDouble(String.format("%.1f",sum/sc)))); } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.FileOutputStream; import java.io.IOException; public class Weather13Runner { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf=new Configuration(); //创建job Job job= Job.getInstance(conf,"weather"); //设置输入输出路径 FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); //设置运行类 job.setJarByClass(Weather13Runner.class); job.setMapperClass(Weather13Mapper.class); job.setReducerClass(Weather13Reducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); System.exit(job.waitForCompletion(true)?0:1); } }