* 1.Rdd文件读取和保存 可以从两个角度来区分 * 文件格式 : text、json、csv、sequence文件、Object序列化文件 * 文件系统 : 本地文件系统、hdfs、hbase、各类型数据库
* 1.SparkContext.textFile * 语法 : * def textFile(path: String,minPartitions: Int = defaultMinPartitions) * : RDD[String] * 功能 : * 从hdfs、本地文件系统(任何节点可用),或者任何文件系统中读取数据 * 并返回 Rdd[String]对象(文本文件的每一行都会成为Rdd的一个元素) * 参数 : * path : 文件系统路径,可用是hdfs的路径(多路径时用,分割) * minPartitions : Rdd的最小分区数 * 返回值 : * 返回一个Rdd,文本文件的每一行都会成为Rdd的一个元素 * note : * 文本文件的编码必须是UTF-8 * *2.SparkContext.wholeTextFiles * 语法 : * def wholeTextFiles(path: String,minPartitions: Int = defaultMinPartitions) * : RDD[(String, String)] * 功能 : * 从hdfs、本地文件系统(任何节点可用),或者任何文件系统中读取数据,每个文件都会作为单个记录读取 * 并返回 RDD[(String, String)]对象(key:文件路径名称,value:文件内容) * 参数 : * path : 文件系统路径,可用是hdfs的路径(多路径时,用,分割) * minPartitions : Rdd的最小分区数 * 返回值 : * 返回一个Rdd,文本文件的每一行都会成为Rdd的一个元素
* 1.rdd.saveAsTextFile * 语法 : * def saveAsTextFile(path: String): Unit = withScope * def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit * 功能 : * 将rdd保存为指定目录下的text * 参数 : * path : 文件系统路径,可用是hdfs的路径 * codec : 压缩算法实现类
object textTest extends App { val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest") val sc: SparkContext = new SparkContext(sparkconf) //1.读取指定路径下的text文件,返回rdd,每一行为rdd的一个元素 private val rdd1: RDD[String] = sc.textFile("Spark_319/src/data/input2") //2.读取指定路径下的text文件,返回rdd,每个文件作为rdd的一个元素(key:文件路径,value:文件内容) private val rdd2: RDD[(String, String)] = sc.wholeTextFiles("Spark_319/src/data/input2") rdd1.collect().foreach(println(_)) println("***********************") rdd2.collect().foreach(println(_)) //3.保存rdd到指定目录 rdd1.saveAsTextFile("Spark_319/src/data/01") rdd2.saveAsTextFile("Spark_319/src/data/02") sc.stop() }