[root@host juana]# touch data.txt [root@host juana]# vim data.txt liubei,sunshangxiang,zhaoyun minyue,guanyu,juyoujin,nakelulu liubei,libai libai,guanyu,bailishouyue
二、具体实现。
1、方法1
这是最原始的方法。
object WordCount { def main(args: Array[String]): Unit = { // 配置spark环境 val conf = new SparkConf().setMaster("local[*]").setAppName("wc") // 新建SparkContext val sc = new SparkContext(conf) //读取文件 sc.textFile("data/data.txt") // 扁平化处理 .flatMap(line=>{line.split(",")}) // 逐个击破 .map(x=>(x,1)).reduceByKey(_+_) // 逐个输出 .foreach(println) // 关闭环境 sc.stop() } }
output
(liubei,2) (zhaoyun,1) (sunshangxiang,1) (nakelulu,1) (libai,2) (juyoujin,1) (guanyu,2) (bailishouyue,1) (minyue,1)
2、方法2
object WordCount { def main(args: Array[String]): Unit = { // 配置spark环境 val conf = new SparkConf().setMaster("local[*]").setAppName("wc") // 新建SparkContext val sc = new SparkContext(conf) //读取文件 sc.textFile("data/data.txt") .flatMap(line => {line.split(" ")}) .map(data => (data, 1)) .groupBy(_._1) .map(data=>(data._1, data._2.size)) .foreach(println) sc.stop()
output
(liubei,2) (zhaoyun,1) (sunshangxiang,1) (nakelulu,1) (libai,2) (juyoujin,1) (guanyu,2) (bailishouyue,1) (minyue,1)
3、方法3
object WordCount { def main(args: Array[String]): Unit = { // 配置spark环境 val conf = new SparkConf().setMaster("local[*]").setAppName("wc") // 新建SparkContext val sc = new SparkContext(conf) //读取文件 sc.textFile("data/data.txt") .flatMap(line => {line.split(" ")}) .map(data => (data, 1)) .groupByKey() .map(data=>(data._1, data._2.size)) .foreach(println) sc.stop()
output
(liubei,2) (zhaoyun,1) (sunshangxiang,1) (nakelulu,1) (libai,2) (juyoujin,1) (guanyu,2) (bailishouyue,1) (minyue,1)
4、方法4
object WordCount { def main(args: Array[String]): Unit = { // 配置spark环境 val conf = new SparkConf().setMaster("local[*]").setAppName("wc") // 新建SparkContext val sc = new SparkContext(conf) //读取文件 sc.textFile("data/data.txt") .flatMap(line => line.split(" ")) .map(data => (data, 1)) .aggregateByKey(0)(_ + _, _ + _) .foreach(println)
output
(liubei,2) (zhaoyun,1) (sunshangxiang,1) (nakelulu,1) (libai,2) (juyoujin,1) (guanyu,2) (bailishouyue,1) (minyue,1)
5、方法5
object WordCount { def main(args: Array[String]): Unit = { // 配置spark环境 val conf = new SparkConf().setMaster("local[*]").setAppName("wc") // 新建SparkContext val sc = new SparkContext(conf) //读取文件 sc.textFile("data/data.txt") .flatMap(line => line.split(" ")) .map(data => (data, 1)) .foldByKey(0)( _ + _) .foreach(println)
output
(liubei,2) (zhaoyun,1) (sunshangxiang,1) (nakelulu,1) (libai,2) (juyoujin,1) (guanyu,2) (bailishouyue,1) (minyue,1)
6、方法6
object WordCount { def main(args: Array[String]): Unit = { // 配置spark环境 val conf = new SparkConf().setMaster("local[*]").setAppName("wc") // 新建SparkContext val sc = new SparkContext(conf) //读取文件 sc.textFile("data/data.txt") .flatMap(line => line.split(" ")) .map(data => (data, 1)) .combineByKey(v=>v,(x:Int,y)=>(x+y),(x:Int,y)=>(x+y)) .foreach(println)
output
(liubei,2) (zhaoyun,1) (sunshangxiang,1) (nakelulu,1) (libai,2) (juyoujin,1) (guanyu,2) (bailishouyue,1) (minyue,1)
7、方法7
object WordCount { def main(args: Array[String]): Unit = { // 配置spark环境 val conf = new SparkConf().setMaster("local[*]").setAppName("wc") // 新建SparkContext val sc = new SparkContext(conf) //读取文件 val Rdd: RDD[String] = sc.textFile("data/data.txt") val rdd: RDD[String] = Rdd.flatMap(line => { val strings: Array[String] = line.split(",") strings }) val stringToLong: collection.Map[String, Long] = rdd.map(data => (data, 1)).countByKey() println(stringToLong)
output
Map( nakelulu -> 1, juyoujin -> 1, sunshangxiang -> 1, libai -> 2, minyue -> 1, zhaoyun -> 1, liubei -> 2, guanyu -> 2, bailishouyue -> 1 )
8、方法8
object WordCount { def main(args: Array[String]): Unit = { // 配置spark环境 val conf = new SparkConf().setMaster("local[*]").setAppName("wc") // 新建SparkContext val sc = new SparkContext(conf) //读取文件 val Rdd: RDD[String] = sc.textFile("data/data.txt") val rdd: RDD[String] = Rdd.flatMap(line => { val strings: Array[String] = line.split(",") strings }) val stringToLong: collection.Map[String, Long] = rdd.countByValue() println(stringToLong)
output
Map( nakelulu -> 1, juyoujin -> 1, sunshangxiang -> 1, libai -> 2, minyue -> 1, zhaoyun -> 1, liubei -> 2, guanyu -> 2, bailishouyue -> 1 )
9、方法9
object WordCount { def main(args: Array[String]): Unit = { // 配置spark环境 val conf = new SparkConf().setMaster("local[*]").setAppName("wc") // 新建SparkContext val sc = new SparkContext(conf) //读取文件 val Rdd: RDD[String] = sc.textFile("data/data.txt") val rdd: RDD[String] = Rdd.flatMap(line => { val strings: Array[String] = line.split(",") strings }) val RDD1: RDD[mutable.Map[String, Long]] = rdd.map(word => mutable.Map[String, Long]((word, 1L))) val stringToInt: mutable.Map[String, Long] = RDD1.reduce((map1, map2) => { map2.foreach { case (word, count) => val newCount: Long = map1.getOrElse(word, 0L) + count map1.update(word, newCount) } map1 } ) println(stringToInt)
output
Map( nakelulu -> 1, juyoujin -> 1, sunshangxiang -> 1, libai -> 2, minyue -> 1, zhaoyun -> 1, liubei -> 2, guanyu -> 2, bailishouyue -> 1 )
10、方法10
object WordCount { def main(args: Array[String]): Unit = { // 配置spark环境 val conf = new SparkConf().setMaster("local[*]").setAppName("wc") // 新建SparkContext val sc = new SparkContext(conf) //读取文件 val Rdd: RDD[String] = sc.textFile("data/data.txt") val rdd: RDD[String] = Rdd.flatMap(line => { val strings: Array[String] = line.split(",") strings }) val RDD1: RDD[mutable.Map[String, Int]] = rdd.map(word => mutable.Map[String, Int]((word, 1))) val stringToInt: mutable.Map[String, Int] = RDD1.aggregate(mutable.Map[String, Int]())((map1, map2) => { map2.foreach { case (word, count) => val newCount: Int = map1.getOrElse(word, 0) + count map1.update(word, newCount) } map1 }, (map1, map2) => { map2.foreach { case (word, count) => val newCount: Int = map1.getOrElse(word, 0) + count map1.update(word, newCount) } map1 }) println(stringToInt)
output
Map( nakelulu -> 1, juyoujin -> 1, sunshangxiang -> 1, libai -> 2, minyue -> 1, zhaoyun -> 1, liubei -> 2, guanyu -> 2, bailishouyue -> 1 )
11、方法11
object WordCount { def main(args: Array[String]): Unit = { // 配置spark环境 val conf = new SparkConf().setMaster("local[*]").setAppName("wc") // 新建SparkContext val sc = new SparkContext(conf) //读取文件 val Rdd: RDD[String] = sc.textFile("data/data.txt") val rdd: RDD[String] = Rdd.flatMap(line => { val strings: Array[String] = line.split(",") strings}) val RDD1: RDD[mutable.Map[String, Int]] = rdd.map(word => mutable.Map[String, Int]((word, 1))) val stringToInt: mutable.Map[String, Int] = RDD1.fold(mutable.Map[String, Int]())((map1, map2) => { map2.foreach { case (word, count) => val newCount: Int = map1.getOrElse(word, 0) + count map1.update(word, newCount) } map1 }) println(stringToInt)
output
Map( nakelulu -> 1, juyoujin -> 1, sunshangxiang -> 1, libai -> 2, minyue -> 1, zhaoyun -> 1, liubei -> 2, guanyu -> 2, bailishouyue -> 1 )
好了,上面就是我们的11种实现方法,那现在我们来做一个总结吧。
大家可能注意到了,方法6及以前都是单个输出,但是方法7以及以后j结果都是Map?是巧合吗?还是道德的缺失?
好了不扯了,主要是因为实现WordCount的主要算子不一样。前面都是Transformation (转换)算子,后面是Action (行动)算子。
来,瞧瞧吧。
Transformation 是惰性算子,待需要的时候执行,Action 是活动算子,直接生成任务执行。一个Action 对应着一个任务。
算子类型 | 实现主要算子 | 算子简介 |
---|---|---|
Transformation | groupBy | value数据类型分组算子,使用需要指定分组值,返回值是二元组(k,迭代器) |
Transformation | groupByKey | Key-Value数据类型分组算子,不需要指定分组数据,直接按照k分组。返回值是二元组(k,迭代器) |
Transformation | reduceByKey | Key-Value数据类型分组聚合算子,不需要指定分组数据,直接按照k分组,需要指定聚合函数(分区间函数和分区类函数一样) |
Transformation | aggregateByKey | Key-Value数据类型分组聚合算子,不需要指定分组数据,直接按照k分组,需要指定聚合函数(分区间和分区类函数不一样)和聚合初始值,柯里化 |
Transformation | foldByKey | 分组聚合类似于aggregateByKey 只是foldByKey可以表示分区间和分区类的计算逻辑是一样的,柯里化 |
Transformation | combineByKey | Key-Value数据类型分组聚合算子,有三个参数第一个参数:对第一个数据做修饰,第二个参数:分区内聚合函数, 第三个参数:分区间聚合函数,中间变量的类型有可能编译没办法识别,需要标明泛型 |
Action | countByKey | 按照key值进行分组聚合,底层调用的是reduceByKey(+) |
Action | countByValue | 直接聚合,底层调用countByKey ,map(value => (value, null)).countByKey() |
Action | reduce | 聚合算子,底层需要自己去实现,存在的价值是自定义底层。见上面的用法即明白 |
Action | fold | 比reduce算子,多一个参数,可以设置聚合时中间临时变量的初始值] |
Action | aggregate | 可以执行分区间聚合和分区类聚合,比如fold多一个参数,分别设置RDD数据集合时局部聚合函数和全局聚合函数 |
以上只是算子的简单介绍,后面我们会对其原理以及源码进行说明。这些简介配合用法大家先看着理解哈,各位看客,怠慢了。