RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
数据存储在大数据集群不同节点上
RDD 封装了计算逻辑,并不保存数据
RDD 是一个抽象类,需要子类具体实现
RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD 里面封装计算逻辑
RDD和IO流的关系
2)Spark 通过申请资源创建调度节点和计算节点
3)Spark 框架根据需求将计算逻辑根据分区划分成不同的任务
4)调度节点将任务根据计算节点状态发送到对应的计算节点进行计算
从以上流程可以看出 RDD 在整个流程中主要用于将逻辑进行封装,并生成 Task 发送给Executor 节点执行计算
object Spark_rdd_01 { def main(args: Array[String]): Unit = { //TODO 准备环境 val conf = new SparkConf().setMaster("local[*]").setAppName("RDD") val sc = new SparkContext(conf) //TODO 创建RDD //从内存中创建RDD,将内存中集合的数据作为处理的数据源 val seq=Seq[Int](1,2,3,4) //parallelize:并行 //val rdd: RDD[Int] = sc.parallelize(seq) //makeRDD方法在底层实现时其实就是调用了rdd对象的parallelize方法 val rdd: RDD[Int] = sc.makeRDD(seq) rdd.collect().foreach(println) //TODO 关闭环境 sc.stop() } }
1 2 3 4
object Spark_rdd_01 { def main(args: Array[String]): Unit = { //TODO 准备环境 val conf = new SparkConf().setMaster("local[*]").setAppName("RDD") val sc = new SparkContext(conf) //TODO 创建RDD //从文件中创建RDD,将文件中的数据作为处理的数据源 //path路径默认以当前环境的根路径为基准,可以写绝对路径,也可以写相对路径 //path路径可以是文件的具体路径,也可以是目录名称 //val rdd=sc.textFile("datas") //path路径还可以使用通配符 //val rdd=sc.textFile("datas/1*.txt") //path还可以是分布式存储系统的路径:HDFS //val rdd=sc.textFile("hdfs://linux1:8020/test.txt") //textFile:以行为单位来读取数据,读取的数据都是字符串 //wholeTextFiles:以文件为单位读取数据 //val rdd = sc.wholeTextFiles("datas") //读取的结果表示为元组,第一个元素表示文件路径,第二个元素表示文件内容 val rdd: RDD[String] = sc.textFile("file:///D:\\workspace\\leke-bigdata\\datas\\1.txt") rdd.collect().foreach(println) //TODO 关闭环境 sc.stop() } }
Hello World Hello Spark hello scala hello Spark
object Spark_rdd_01 { def main(args: Array[String]): Unit = { //TODO 准备环境 val conf = new SparkConf().setMaster("local[*]").setAppName("RDD") //conf.set("spark.default.parallelism","5") 可以手动配置核数 val sc = new SparkContext(conf) //TODO 创建RDD //RDD的并行度&分区 //makeRDD方法可以传递第二个参数,这个参数表示分区的数量 //第二个参数可以不传递的,那么makeRDD方法会使用默认值:defaultParallelism(默认为) //源码scheduler.conf.getInt("spark.default.parallelism",totalCores) //spark在默认情况下,从配置对象中获取配置参数:spark.default.parallelism //如果获取不到,那么使用totalCores属性,这个属性取值为当前运行环境的最大可用核数 val rdd: RDD[Int] = sc.makeRDD( List(1, 2, 3, 4), 2 ) //将处理的数据保存成分区文件 rdd.saveAsTextFile("file:///D:\\workspace\\leke-bigdata\\output") //TODO 关闭环境 sc.stop() } }
object Spark_rdd_01 { def main(args: Array[String]): Unit = { //TODO 准备环境 val conf = new SparkConf().setMaster("local[*]").setAppName("RDD") val sc = new SparkContext(conf) //TODO 创建RDD //textFile可以将文件作为数据处理的数据源,默认也可以设定分区,默认分区数是2 //minPartition:最小分区数量 //math.min(defaultParallelism,2) //如果不想使用默认的分区数量,可以通过第二个参数指定分区数 //Spark读取文件,底层其实使用的是Hadoop的读取方式 //分区数量的计算方式: // totalSize=7 // goalSize=7/2=3(byte) // 7/3=2...1(1.1)+1=3(分区) val rdd: RDD[String] = sc.textFile("file:///D:\\workspace\\leke-bigdata\\datas\\1.txt",3) //将处理的数据保存成分区文件 rdd.saveAsTextFile("file:///D:\\workspace\\leke-bigdata\\output") //TODO 关闭环境 sc.stop() } }
1.txt
1 2 3
object Spark_rdd_01 { def main(args: Array[String]): Unit = { //TODO 准备环境 val conf = new SparkConf().setMaster("local[*]").setAppName("RDD") val sc = new SparkContext(conf) //TODO 创建RDD //TODO 数据分区的分配 //1.数据以行为单位进行读取 // spark读取文件,采用的是hadoop的方式读取,所以一行一行读取,和字节数没有关系 //2.数据读取时以偏移量为单位,偏移量不会被重复读取 /*字节 偏移量 * 1@@ => 012 * 2@@ => 345 * 3 => 6 */ //3.数据分区的偏移量范围的计算 /*分区 偏移量范围 * 0 => [0,3] =>12 * 1 => [3,6] =>3 * 2 => [6,7] => */ //如果数据源为多个文件,那么计算分区时以文件为单位进行分区 val rdd: RDD[String] = sc.textFile("file:///D:\\workspace\\leke-bigdata\\datas\\1.txt",2) //将处理的数据保存成分区文件 rdd.saveAsTextFile("file:///D:\\workspace\\leke-bigdata\\output") //TODO 关闭环境 sc.stop() } }
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子-map val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4)) //1,2,3,4 //2,4,6,8 //转换函数 // def mapFunction(num:Int):Int={ // num*2 // } // val mapRDD: RDD[Int] = rdd.map(mapFunction) // val mapRDD: RDD[Int] = rdd.map((num:Int)=>{num*2}) //匿名函数 val mapRDD: RDD[Int] = rdd.map(_*2) mapRDD.collect().foreach(println) sc.stop() } }
2 4 6 8
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子-map val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2) //1,2一个分区 3,4一个分区 //1.rdd的计算一个分区内的数据是一个一个执行逻辑 // 只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据 // 分区内数据的执行是有序的 //2.不同分区数据计算是无序的 val mapRDD1= rdd.map( x => { println("List:"+x) x } ) val mapRDD2= mapRDD1.map( x => { println("转换后List:" + x) x }) mapRDD2.collect() sc.stop() } }
List:1 List:3 转换后List:1 转换后List:3 List:2 List:4 转换后List:2 转换后List:4
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子-mapPartitions //mapPartitions:可以以分区为单位进行数据转换操作 // 但是会将整个分区的数据加载到内存进行引用 // 如果处理完的数据是不会被释放掉,存在对象的引用 // 在内存较小,数据量较大的场合下,容易出现内存溢出 val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2) val rdd1: RDD[Int] = rdd.mapPartitions(iter => { println("一个分区执行一次") iter.map(_ * 2) }) rdd1.collect().foreach(println) sc.stop() } }
一个分区执行一次 一个分区执行一次 2 4 6 8
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子-mapPartitions val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2) //[1,2] [3,4] //可以得出各分区内的最大值 val rdd1 = rdd.mapPartitions(iter => { List(iter.max).iterator }) rdd1.collect().foreach(println) sc.stop() } }
2 4
map和mapPartitions的区别
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子-mapPartitionsWithIndex val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2) // [1,2] [3,4] // [3,4] val rdd1: RDD[Int] = rdd.mapPartitionsWithIndex((index, iter) => { if (index == 1) { iter } else { Nil.iterator //Nil返回空集合 } }) rdd1.collect().foreach(println) sc.stop() } }
3 4
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子-mapPartitionsWithIndex val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4)) // 1,2,3,4 // (分区号,数字) val rdd1: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index, iter) => { iter.map(x => (index, x)) }) rdd1.collect().foreach(println) sc.stop() } }
(2,1) (5,2) (8,3) (11,4)
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子-flatMap val rdd: RDD[List[Int]] = sc.makeRDD(List( List(1, 2), List(3,4))) val rdd1: RDD[Int] = rdd.flatMap(x=>x) rdd.collect().foreach(println) rdd1.collect().foreach(println) sc.stop() } }
List(1, 2) List(3, 4) 1 2 3 4
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子-flatMap val rdd: RDD[String] = sc.makeRDD(List( "hello spark", "hello java")) val rdd1: RDD[String] = rdd.flatMap(x => { x.split(" ") }) rdd1.collect().foreach(println) sc.stop() } }
hello spark hello java
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子-flatMap //数据类型不一致时,使用模式匹配 val rdd: RDD[Any] = sc.makeRDD(List(List(1,2),3,List(4,5))) val rdd1: RDD[Any] = rdd.flatMap(x => { x match { case x: List[Int] => x case x => List(x) } }) rdd1.collect().foreach(println) sc.stop() } }
1 2 3 4 5
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子-glom val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2) //List => Int //Int => Array val glomRdd: RDD[Array[Int]] = rdd.glom() glomRdd.collect().foreach(x=>println(x.mkString(","))) sc.stop() } }
1,2 3,4
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子-glom val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2) //[1,2],[3,4] 分区取最大值 //[2] [4] 最大值求和 //[6] val glomRDD: RDD[Array[Int]] = rdd.glom() val maxRDD: RDD[Int] = glomRDD.map( array => { array.max } ) println(maxRDD.collect().sum) sc.stop() } }
6
将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子-groupBy val rdd: RDD[String] = sc.makeRDD(List("hello","spark","hadoop","scala","java"),2) val groupRDD: RDD[(Char, Iterable[String])] = rdd.groupBy(_.charAt(0)) groupRDD.collect().foreach(println) sc.stop() } } ` ```scala (h,CompactBuffer(hello, hadoop)) (j,CompactBuffer(java)) (s,CompactBuffer(spark, scala))
1.txt
uu wr erw 17/05/2015:10:05:03 +000 uu wr erw 17/05/2015:02:05:03 +000 uu wr erw 17/05/2015:12:05:03 +000 uu wr erw 17/05/2015:03:05:03 +000 uu wr erw 17/05/2015:10:05:03 +000 uu wr erw 17/05/2015:10:05:03 +000 uu wr erw 17/05/2015:10:05:03 +000 uu wr erw 17/05/2015:03:05:03 +000 uu wr erw 17/05/2015:11:05:03 +000 uu wr erw 17/05/2015:10:05:03 +000 uu wr erw 17/05/2015:11:05:03 +000 uu wr erw 17/05/2015:11:05:03 +000 uu wr erw 17/05/2015:11:05:03 +000 uu wr erw 17/05/2015:02:05:03 +000 uu wr erw 17/05/2015:02:05:03 +000 uu wr erw 17/05/2015:12:05:03 +000 uu wr erw 17/05/2015:12:05:03 +000 uu wr erw 17/05/2015:10:05:03 +000
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) val rdd: RDD[String] = sc.textFile("file:///D:\\workspace\\leke-bigdata\\datas\\1.txt") val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(line => { val datas: Array[String] = line.split(" ") val time: String = datas(3) val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss") /* * parse()返回的是一个Date类型数据 * parse方法可以把String型的字符串转换成特定格式的date类型, * 使用parse时字符串长度要和定义的SimpleDateFormat对象长度一致 */ val date: Date = sdf.parse(time) println(date) println("----------------") val sdf1 = new SimpleDateFormat("HH") /* * format返回的是一个String类型的数据 * format方法可以把Date型字符转换成特定格式的String类型, * 如果Date类型和定义的SimpleDateFormat长度不一致会自动在后面补0 */ val hour: String = sdf1.format(date) println(hour) (hour, 1) // 法二:字符串截取 // val str: String = datas(3).substring(11, 13) //(str, 1) }).groupBy(_._1) timeRDD.map { //模式匹配 case (hour, iter) => (hour, iter.size) }.collect().foreach(println) sc.stop() } }
Sun May 17 11:05:03 CST 2015 Sun May 17 10:05:03 CST 2015 ---------------- ---------------- 11 10 Sun May 17 02:05:03 CST 2015 ---------------- Sun May 17 11:05:03 CST 2015 ---------------- 02 11 Sun May 17 11:05:03 CST 2015 ---------------- 11 Sun May 17 02:05:03 CST 2015 ---------------- 02 Sun May 17 02:05:03 CST 2015 ---------------- 02 Sun May 17 12:05:03 CST 2015 ---------------- 12 Sun May 17 12:05:03 CST 2015 ---------------- 12 Sun May 17 03:05:03 CST 2015 ---------------- 03 Sun May 17 10:05:03 CST 2015 ---------------- 10 Sun May 17 10:05:03 CST 2015 ---------------- 10 Sun May 17 12:05:03 CST 2015 ---------------- Sun May 17 10:05:03 CST 2015 ---------------- 12 10 Sun May 17 03:05:03 CST 2015 ---------------- Sun May 17 10:05:03 CST 2015 ---------------- 03 10 Sun May 17 11:05:03 CST 2015 ---------------- 11 Sun May 17 10:05:03 CST 2015 ---------------- 10 (02,3) (11,4) (03,2) (12,3) (10,6)
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。
当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子-filter val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4)) rdd.filter(num=>(num%2!=0)).collect().foreach(println) sc.stop() } }
1 3
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子-filter val rdd: RDD[String] = sc.textFile("file:///D:\\workspace\\leke-bigdata\\datas\\1.txt") //过滤10点的数据 rdd.filter(line=>{ line.split(" ")(3).startsWith("17/05/2015:10") }).collect().foreach(println) sc.stop() } }
uu wr erw 17/05/2015:10:05:03 +000 uu wr erw 17/05/2015:10:05:03 +000 uu wr erw 17/05/2015:10:05:03 +000 uu wr erw 17/05/2015:10:05:03 +000 uu wr erw 17/05/2015:10:05:03 +000 uu wr erw 17/05/2015:10:05:03 +000
根据指定的规则从数据集中抽取数据
(应用场景:数据倾斜)
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子-sample val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10)) //sample算子需要传递三个参数 //1.第一个参数withReplacement表示,抽取数据后是否将数据返回 true(放回),false(丢弃) //2.第二个参数fraction表示:如果抽取不放回的场合:数据源中每条数据被抽取的概率,基准值的概念 // 如果抽取放回的场合:表示数据源中的每条数据别抽取的可能次数 //3.第三个参数seed表示,抽取数据时随机算法的种子 // 如果不传递第三个参数,那么使用的是当前系统时间 println(rdd.sample( false, 0.4, 1 ).collect().mkString(",")) println("-------------------") println(rdd.sample( true, 2 ).collect().mkString(",")) sc.stop() } }
1,2,3,7,9 ------------------- 2,2,2,3,3,3,3,4,4,4,5,6,7,7,7,8,8,8,9,9,9,9,10,10,10,10,10,10,10
将数据集中重复的数据去重
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子-filter val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,2,3,2)) println(rdd.distinct().collect() mkString (",")) sc.stop() } }
1,2,3,4
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子-coalesce val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),3) val newRDD: RDD[Int] = rdd.coalesce(2) newRDD.saveAsTextFile("file:///D:\\workspace\\leke-bigdata\\output") sc.stop() } }
coalesce方法默认情况下不会将分区的数据打乱重新组合
这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜
如果想要让数据均衡,可以进行shuffle处理,第二个参数设为true
coalesce算子可以扩大分区的,但是如果不进行shuffle操作,是没有意义的,不起作用
如果想要实现扩大分区的效果,需要使用shuffle操作
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子-coalesce val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),3) val newRDD: RDD[Int] = rdd.coalesce(2,true) newRDD.saveAsTextFile("file:///D:\\workspace\\leke-bigdata\\output") sc.stop() } }
该操作内部其实执行的是 coalesce 操作,底层是coalesce函数,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的RDD,还是将分区数少的 RDD 转换为分区数多的RDD,repartition 操作都可以完成,因为无论如何都会经 shuffle 过程。
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子-repartition val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),2) val newRDD: RDD[Int] = rdd.repartition(3) newRDD.saveAsTextFile("file:///D:\\workspace\\leke-bigdata\\output") sc.stop() } }
该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原RDD 的分区数一致。中间存在 shuffle 的过程
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子-sortBy //sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认是升序 //第二个参数可以改变排序的方式,用false //sortBy默认情况下,不会改变分区,但是中间存在shuffle操作 val rdd: RDD[Int] = sc.makeRDD(List(4,3,2,6,4,1)) println(rdd.sortBy(x => x,false).collect().mkString(",")) println("---------------") val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("1",1),("11",2),("2",3))) println(rdd1.sortBy(x => x._1).collect().mkString(",")) println("---------------") val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("1",1),("11",2),("2",3))) println(rdd2.sortBy(x => x._1.toInt).collect().mkString(",")) sc.stop() } }
6,4,4,3,2,1 --------------- (1,1),(11,2),(2,3) --------------- (1,1),(2,3),(11,2)
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子-双Value类型 //交集,并集和差集要求两个数据源数据类型保持一致 //拉链操作两个数据源的类型可以不一致 //拉链操作时,两个数据源要求分区数量要保持一致,分区中数据数量保持一致 val rdd1: RDD[Int] = sc.makeRDD(List(1,2,3,4)) val rdd2: RDD[Int] = sc.makeRDD(List(3,4,5,6)) //交集 println(rdd1.intersection(rdd2).collect().mkString(",")) //并集 println(rdd1.union(rdd2).collect().mkString(",")) //差集 println(rdd1.subtract(rdd2).collect().mkString(",")) //拉链 println(rdd1.zip(rdd2).collect().mkString(",")) sc.stop() } }
3,4 1,2,3,4,3,4,5,6 1,2 (1,3),(2,4),(3,5),(4,6)
partitionBy
将数据按照指定Partitioner 重新进行分区。Spark 默认的分区器是HashPartitioner
object Test { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5),2) val mapRDD: RDD[(Int, Int)] = rdd.map((_,1)) //partitionBy根据指定的分区规则对数据进行重分区 mapRDD.partitionBy(new HashPartitioner(2)) .saveAsTextFile("file:///D:\\workspace\\leke-bigdata\\output") sc.stop()
reduceByKey
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子- Key-Value类型 //reduceByKey:相同的key的数据进行value数据的聚合操作 //reduceByKey分区内和分区间计算规则是相同的 //Scala语言中一般的聚合操作都是两两聚合,spark基于Scala开发的,所以它的聚合也是两两聚合 //reduceByKey中如果key的数据只有一个,是不会参与运算的 val rdd: RDD[(String, Int)] =sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4))) val rdd1: RDD[(String, Int)] = rdd.reduceByKey((x,y)=>{ println(s"x=${x},y=${y}") x+y }) rdd1.collect().foreach(println) sc.stop() } }
x=1,y=2 x=3,y=3 (a,6) (b,4)
groupByKey
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子- Key-Value类型 val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4))) //groupByKey: 将数据源中的数据,相同key的数据分在一个组中,形成一个对偶元组 // 元组中的第一个元素就是key // 元组中的第二个元素就是相同key的value的 val rdd1: RDD[(String, Iterable[Int])] = rdd.groupByKey() rdd1.collect().foreach(println) println("----------------------") val rdd2: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1) rdd2.collect().foreach(println) sc.stop() } }
(a,CompactBuffer(1, 2, 3)) (b,CompactBuffer(4)) ---------------------- (a,CompactBuffer((a,1), (a,2), (a,3))) (b,CompactBuffer((b,4)))
groupByKey和reduceByKey的区别
aggregateByKey
将数据根据不同的规则进行分区内计算和分区间计算
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子- Key-Value类型 val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("a",2),("b",6),("b",4),("a",6),("b",2)),2) //取出每个分区内相同 key 的最大值然后分区间相加 //aggregateByKey存在函数柯里化,有两个参数列表 //第一个参数列表,需要传递一个参数,表示为初始值 // 主要用于当碰见第一个key的时候,和value进行分区内计算 //第二个参数列表需要传递两个参数: // 第一个参数表示分区内计算规则 // 第二个参数表示分区间计算规则 val rdd1: RDD[(String, Int)] = rdd.aggregateByKey(5)( (x, y) => math.max(x, y), (x, y) => x + y ) rdd1.collect().foreach(println) sc.stop() } }
(b,11) (a,11)
foldByKey
当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为foldByKey
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子- Key-Value类型 val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("a",2),("b",4),("a",6),("b",2)),2) val rdd1: RDD[(String, Int)] = rdd.foldByKey(0)(_+_) rdd1.collect().foreach(println) sc.stop() } }
(b,6) (a,9)
combineByKey
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子- Key-Value类型 val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("a",2),("b",4),("a",6),("b",2)),2) //combineByKey:方法需要三个参数 //第一个参数表示:将相同key的第一个数据进行结构的转换,实现操作 //第二个参数表示:分区内的计算规则 //第三个参数表示:分区间的计算规则 val rdd1: RDD[(String, (Int, Int))] = rdd.combineByKey( v => (v, 1), (t: (Int, Int), v) => { (t._1 + v, t._2 + 1) }, (t1: (Int, Int), t2: (Int, Int)) => { (t1._1 + t2._1, t1._2 + t2._2) } ) rdd1.collect().foreach(println) println("---------------------") val rdd2: RDD[(String, Int)] = rdd1.mapValues { case (num, cnt) => { num / cnt } } rdd2.collect().foreach(println) sc.stop() } }
(b,(6,2)) (a,(9,3)) --------------------- (b,3) (a,3)
reduceByKey、foldByKey、aggregateByKey、combineByKey的区别
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子- Key-Value类型 val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("a",2),("b",4),("a",6),("b",2)),2) //wordcount println(rdd.reduceByKey(_ + _).collect().mkString(",")) println(rdd.aggregateByKey(0)(_+_,_+_).collect().mkString(",")) println(rdd.foldByKey(0)(_+_).collect().mkString(",")) println(rdd.combineByKey(v=>v,(x:Int,y)=>(x+y),(x:Int,y:Int)=>(x+y)).collect().mkString(",")) sc.stop() } }
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子- join //join:两个不同数据源的数据,相同的key的value会连接在一起,形成元组 //如果两个数据源中key没有匹配上,那么数据不会出现在结果中 //如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔乘积,数据量会几何性增长,会导致性能降低 val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",4),("c",2))) val rdd1: RDD[(String, Any)] = sc.makeRDD(List(("a","g"),("b",5),("d",6),("a",2))) val rdd2: RDD[(String, (Int, Any))] = rdd.join(rdd1) rdd2.collect().foreach(println) sc.stop() } }
(a,(1,g)) (a,(1,2)) (b,(4,5))
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子- 左右连接 val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",4),("c",2))) val rdd1: RDD[(String, Any)] = sc.makeRDD(List(("a","g"),("b",5))) val rdd2: RDD[(String, (Int, Option[Any]))] = rdd.leftOuterJoin(rdd1) val rdd3: RDD[(String, (Option[Int], Any))] = rdd.rightOuterJoin(rdd1) rdd2.collect().foreach(println) println("---------------------") rdd3.collect().foreach(println) sc.stop() } }
(a,(1,Some(g))) (b,(4,Some(5))) (c,(2,None)) --------------------- (a,(Some(1),g)) (b,(Some(4),5))
object Spark_rdd_01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RDD").setMaster("local[*]") val sc = new SparkContext(conf) //TODO 算子- cogroup //cogroup-connection+group val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",4),("c",2))) val rdd1: RDD[(String, Any)] = sc.makeRDD(List(("a","g"),("b",5),("b","m"))) val rdd2: RDD[(String, (Iterable[Int], Iterable[Any]))] = rdd.cogroup(rdd1) rdd2.collect().foreach(println) sc.stop() } }
(a,(CompactBuffer(1),CompactBuffer(g))) (b,(CompactBuffer(4),CompactBuffer(5, m))) (c,(CompactBuffer(2),CompactBuffer()))