package com.core.day2 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo13Sort { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("Demo13Sort") conf.setMaster("local") val sc = new SparkContext(conf) val kvRDD: RDD[(String, Int)] = sc.textFile("data/score.txt") .map(_.split(",")) .filter(_.length == 3) .map{ case Array(sid:String,_,sco:String) => (sid,sco.toInt) } val sum_scoreRDD: RDD[(String, Int)] = kvRDD.reduceByKey(_ + _) /** * sortBy: 指定一个字段进行排序,默认是升序 * ascending = false: 降序 * */ val sortRDD: RDD[(String, Int)] = sum_scoreRDD.sortBy(kv => -kv._2) sortRDD.foreach(println) } }
package com.core.day2 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo14MapValues { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("Demo14MapValues") conf.setMaster("local") val sc = new SparkContext(conf) // 读取-分割-清洗-取出数据 val idAndScoADD: RDD[(String, Int)] = sc.textFile("data/score.txt") .map(_.split(",")) .filter(_.length == 3) .map{ case Array(sid:String,cid:String,sco:String) => (sid,sco.toInt) } //统计总分 val kvRDD: RDD[(String, Int)] = idAndScoADD.reduceByKey(_ + _) /** * mapValues: 对value作处理,key可以不变 * */ // 对上述所有的数据 乘以100 val sco_100: RDD[(String, Int)] = kvRDD.mapValues(sco => sco * 100) sco_100.foreach(println) } }
mapPartitions:一次处理一个分区的数据,一个一个传递给后面的函数
迭代器中是一个分区的数据
函数的返回值也是一个迭代器
mapPartitionsWithIndex:对一个分区进行编号
package com.core.day2 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo15MapPartition { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("Demo15MapPartition") conf.setMaster("local") val sc = new SparkContext(conf) val lineRDD: RDD[String] = sc.textFile("data/words") //println(lineRDD.getNumPartitions) /** * mapPartitions:一次处理一个分区的数据,一个一个传递给后面的函数 * 迭代器中是一个分区的数据 * 函数的返回值也是一个迭代器 * */ val wordsRDD: RDD[String] = lineRDD.mapPartitions((iter:Iterator[String]) => { //在函数类对一个分区的数据进行处理 val words: Iterator[String] = iter.flatMap(_.split(",")) words }) wordsRDD.foreach(println) /** * mapPartitionsWithIndex:对一个分区进行编号 * */ wordsRDD.mapPartitionsWithIndex{ case (index:Int,iter:Iterator[String]) => println(s"mapPartitionsWithIndex:$index") iter } .foreach(println) } }