1、将测试数据上传到HDFS目录下,这里放到根目录下:/test.txt
2、在master节点中某个目录下:创建mapper、reducer以及run.sh
import sys for line in sys.stdin: line = line.strip() words = line.split() for word in words: print "%s\t%s" % (word, 1)
import sys current_word = None current_count = 0 word = None for line in sys.stdin: line = line.strip() word, count = line.split('\t', 1) try: count = int(count) except ValueError: continue if current_word == word: current_count += count else: if current_word: print "%s\t%s" % (current_word, current_count) current_count = count current_word = word if word == current_word: print "%s\t%s" % (current_word, current_count)
#!/usr/bin/bash streaming_jar="/usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.2.jar" input="/test.txt" output="/output" hadoop fs -rmr $output hadoop jar ${streaming_jar} \ -files mapper.py,reducer.py \ -jobconf mapred.job.priority="VERY_HIGH" \ -jobconf mapred.map.tasks=5 \ -jobconf mapred.job.map.capacity=5 \ -jobconf mapred.job.name="streaming_wordcount" \ -input $input \ -output $output \ -mapper "python mapper.py" \ -reducer "python reducer.py" if [ $? -ne 0 ];then echo "streaming_wordcount job failed" exit 1 fi
3、运行sh run.sh
..... 2022-09-11 03:06:09,869 INFO mapreduce.Job: map 0% reduce 0% 2022-09-11 03:06:15,931 INFO mapreduce.Job: map 14% reduce 0% 2022-09-11 03:06:20,971 INFO mapreduce.Job: map 100% reduce 0% 2022-09-11 03:06:21,979 INFO mapreduce.Job: map 100% reduce 100% 2022-09-11 03:06:21,994 INFO mapreduce.Job: Job job_1662694559814_0004 completed successfully .....
进入HDFS Web管理界面-Utilities-Browse the file system可以看到词频统计结果已写到HDFS根目录/output中