Python教程

Hadoop+Python测试wordcount

本文主要是介绍Hadoop+Python测试wordcount,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

1、将测试数据上传到HDFS目录下,这里放到根目录下:/test.txt
2、在master节点中某个目录下:创建mapper、reducer以及run.sh

  • mapper.py
import sys
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print "%s\t%s" % (word, 1)
  • reducer.py
import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    try:
        count = int(count)
    except ValueError:
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print "%s\t%s" % (current_word, current_count)
        current_count = count
        current_word = word

if word == current_word:
    print "%s\t%s" % (current_word, current_count)
  • run.sh
#!/usr/bin/bash

streaming_jar="/usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.2.jar"
input="/test.txt"
output="/output"
hadoop fs -rmr $output
hadoop jar ${streaming_jar} \
        -files mapper.py,reducer.py \
        -jobconf mapred.job.priority="VERY_HIGH" \
        -jobconf mapred.map.tasks=5 \
        -jobconf mapred.job.map.capacity=5 \
        -jobconf mapred.job.name="streaming_wordcount" \
        -input $input \
        -output $output \
        -mapper "python mapper.py" \
        -reducer "python reducer.py"
if [ $? -ne 0 ];then
        echo "streaming_wordcount job failed"
        exit 1
fi

3、运行sh run.sh

.....

2022-09-11 03:06:09,869 INFO mapreduce.Job:  map 0% reduce 0%
2022-09-11 03:06:15,931 INFO mapreduce.Job:  map 14% reduce 0%
2022-09-11 03:06:20,971 INFO mapreduce.Job:  map 100% reduce 0%
2022-09-11 03:06:21,979 INFO mapreduce.Job:  map 100% reduce 100%
2022-09-11 03:06:21,994 INFO mapreduce.Job: Job job_1662694559814_0004 completed successfully

.....

进入HDFS Web管理界面-Utilities-Browse the file system可以看到词频统计结果已写到HDFS根目录/output中

这篇关于Hadoop+Python测试wordcount的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!