今天继续学习sparkRDD的算子
(1)flatMap
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator11") val sparkContext = new SparkContext(sparkConf) val rdd: RDD[List[Int]]= sparkContext.makeRDD(List(List(1, 2), List(3, 4))) //flatmap,讲List变成Int //使用flatmap进行扁平化处理,将List集合里数据进行拆分 val flatrdd: RDD[Int] = rdd.flatMap( list => { list //讲拆分的数据进行封装成一个LIst } ) flatrdd.collect().foreach(println) sparkContext.stop() }
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator11") val sparkContext = new SparkContext(sparkConf) val rdd: RDD[String]= sparkContext.makeRDD(List("hello word","hello spark")) //flatmap //使用flatmap进行扁平化处理,将List集合里数据进行拆分,用空格做分隔符 val flatrdd: RDD[String] = rdd.flatMap( s => { s.split(" ") } ) flatrdd.collect().foreach(println) sparkContext.stop() }
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator11") val sparkContext = new SparkContext(sparkConf) val rdd= sparkContext.makeRDD(List(List(1, 2), 3,List(4, 5))) //flatmap //因为list集合里类型不一致,所以使用模式匹配的方式,讲不是集合的封装成一个集合 val flatrdd: RDD[Any] = rdd.flatMap( data => { data match { case list: List[_] => list case data => List(data) } } ) flatrdd.collect().foreach(println) sparkContext.stop() }
(2)glom
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sparkContext = new SparkContext(sparkConf) //讲Int变成Array val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4), 2) val glomrdd: RDD[Array[Int]] = rdd.glom() glomrdd.collect().foreach(data=>println(data.mkString(","))) sparkContext.stop() }
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sparkContext = new SparkContext(sparkConf) //将Int变成Array val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4), 2) val glomrdd: RDD[Array[Int]] = rdd.glom() //将2个分区数组数据(Array)用map中的max求每个分区中最大值 val maxRdd: RDD[Int] = glomrdd.map( array => { array.max } ) //将maxRdd 2个分区数组采集求和 println(maxRdd.collect().sum) sparkContext.stop() }
(3)groupBy
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sparkContext = new SparkContext(sparkConf) val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4), 2) def groupFunction(num:Int)={ num%2 } val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction) groupRDD.collect().foreach(println) sparkContext.stop() }
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sparkContext = new SparkContext(sparkConf) val rdd: RDD[String] = sparkContext.makeRDD(List("hello","spark","hi","sss"), 2) val grouprdd: RDD[(Char, Iterable[String])] = rdd.groupBy(_.charAt(0)) grouprdd.collect().foreach(println) sparkContext.stop() }
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sparkContext = new SparkContext(sparkConf) //读取apache.log文件 val rdd=sparkContext.textFile("data/apache.log") //取数据中每小时的点击量 val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map( line => { //将每行数据以空格为分割,分成多个字符串 val data = line.split(" ") //取第4个字符串 val time = data(3) //转换格式 val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss") //解析time val datas= sdf.parse(time) //取“小时”字符 val sdf1 = new SimpleDateFormat("HH") //格式化字符 val hour = sdf1.format(datas) (hour, 1)//比如08小时出现一次计1个 } ).groupBy(_._1) timeRDD.map{ //模式匹配 case (hour,iter)=>{ (hour,iter.size) } }.collect().foreach(println) sparkContext.stop() }
(4)filter
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) //filter,根据符合规则的数据筛选 val rdd= sc.makeRDD(List(1,2,3,4), 2) val fliterrdd: RDD[Int] = rdd.filter( num => num % 2 != 0 ) fliterrdd.collect().foreach(println) sc.stop() }
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) //filter,根据符合规则的数据筛选 val rdd=sc.textFile("data/apache.log") rdd.filter( line=>{ //将每行数据以空格为分割,分成多个字符串 val data = line.split(" ") //取第4个字符串 val time = data(3) time.startsWith("17/05/2015") } ).collect().foreach(println) sc.stop() }