今天学习spark的一些行动算子和序列化
(1)行动算子
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3))) rdd.saveAsTextFile("output") rdd.saveAsObjectFile("output1") //saveAsSequenceFile要求数据类型为key-value类型 rdd.saveAsSequenceFile("output2") sc.stop() }
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3))) val stringToLong: collection.Map[String, Long] = rdd.countByKey() print(stringToLong) sc.stop() }
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3))) //driver端中内存循环打印 rdd.collect().foreach(println) println("0000000000") //executol端中内存循环打印 rdd.foreach(println) sc.stop() }
(2)RDD 序列化(使用算子外的数据要进行序列化,不然就意味着无法传值给 Executor端执行)
在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List(1,2,3,4)) val user=new User() rdd.foreach( num=>{ println("age="+(user.age+num)) } ) sc.stop() } //序列化对象,如果不序列化,无法传对象到executor端进行处理会报错 class User extends Serializable{ var age:Int=30 } //样例类编译时会自动序列化,跟上面一样 case class User1(){ var age:Int=30 }