今天学习完单value的算子和双value算子的开始
(1)distinct
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15") val sc = new SparkContext(sparkConf) //distinct,去重 val rdd=sc.makeRDD(List(1,2,3,4,1,2,3,4)) val rdd1: RDD[Int] = rdd.distinct() rdd1.collect().foreach(println) sc.stop() }
(2)coalesce
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15") val sc = new SparkContext(sparkConf) //coalesce,压缩分区,将多个分区,分成指定分区,节约资源 val rdd=sc.makeRDD(List(1,2,3,4),4) val newrdd: RDD[Int] = rdd.coalesce(2,true)//true是否打乱数据 newrdd.saveAsTextFile("outout") sc.stop() }
(3)repartition
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15") val sc = new SparkContext(sparkConf) //repartition,扩大分区 val rdd=sc.makeRDD(List(1,2,3,4),4) val newrdd: RDD[Int] = rdd.repartition(3) newrdd.saveAsTextFile("outout") sc.stop() }
(4)sortBy
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15") val sc = new SparkContext(sparkConf) //sortBy,根据规则排序 val rdd=sc.makeRDD(List(6,2,8,1,5,3),2) val newrdd: RDD[Int] = rdd.sortBy(num => num) newrdd.collect().foreach(println) sc.stop() }
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15") val sc = new SparkContext(sparkConf) //sortBy,根据规则排序 val rdd=sc.makeRDD(List(("1",1),("11",2),("2",3)),2) //t._1是先比较第一个字符大小,加上toInt后是直接比较字符串代表的数字大小 val newrdd: RDD[(String, Int)] = rdd.sortBy(t => t._1.toInt) newrdd.collect().foreach(println) sc.stop() }
(5)双value
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15") val sc = new SparkContext(sparkConf) //双value val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4)) val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 7, 8)) //交集 val rdd3: RDD[Int] = rdd1.intersection(rdd2) println(rdd3.collect().mkString(",")) //并集 val rdd4: RDD[Int] = rdd1.union(rdd2) println(rdd4.collect().mkString(",")) //差集 val rdd5: RDD[Int] = rdd1.subtract(rdd2) println(rdd5.collect().mkString(",")) //拉链 val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2) println(rdd6.collect().mkString(",")) sc.stop() }