写在前面: 博主是一名软件工程系大数据应用开发专业大二的学生,昵称来源于《爱丽丝梦游仙境》中的Alice和自己的昵称。作为一名互联网小白,
写博客一方面是为了记录自己的学习历程,一方面是希望能够帮助到很多和自己一样处于起步阶段的萌新
。由于水平有限,博客中难免会有一些错误,有纰漏之处恳请各位大佬不吝赐教!个人小站:http://alices.ibilibili.xyz/ , 博客主页:https://alice.blog.csdn.net/
尽管当前水平可能不及各位大佬,但我还是希望自己能够做得更好,因为一天的生活就是一生的缩影
。我希望在最美的年华,做最好的自己!
之前博主利用业余时间,梳理了一份《SparkSQL编程系列》,奈何当时考虑不周,写的不是很详细。于是在正式开始学习了之后,决定整理一篇适合像我一样的小白级别都能看得懂的IDEA操作SparkSQL教程,于是就有了下文…
码字不易,先赞后看,养成习惯!
<!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 --> <repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.8</scala.version> <scala.compat.version>2.11</scala.compat.version> <hadoop.version>2.7.4</hadoop.version> <spark.version>2.2.0</spark.version> </properties> <dependencies> <!--<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>${spark.version}</version> </dependency>--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.27</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive-thriftserver_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>${spark.version}</version> </dependency>--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>${spark.version}</version> </dependency> <!--<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-mr1-cdh5.14.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.0-cdh5.14.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.0-cdh5.14.0</version> </dependency>--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.4</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>1.3.3</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> </dependencies>
Spark会根据文件信息尝试着去推断DataFrame/DataSet的Schema,当然我们也可以手动指定,手动指定的方式有以下几种:
第1种:指定列名添加Schema
第2种:通过StructType指定Schema
第3种:编写样例类,利用反射机制推断Schema
下面将针对上面出现的三种类型为大家一一展示
这里我们先准备好数据源tt.txt
19 zhhshang 66 20 lisi 66 19 wangwu 77 31 zhaoliu 66 19 maqi 88
object Demo01 { def main(args: Array[String]): Unit = { //1.创建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext //设置日志级别 sc.setLogLevel("WARN") //2.读取文件 val fileRDD: RDD[String] = sc.textFile("in/person.txt") val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" ")) val rowRDD: RDD[(Int, String, Int)] = linesRDD.map(line =>(line(0).toInt,line(1),line(2).toInt)) //3.将RDD转成DF //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换 import spark.implicits._ val personDF: DataFrame = rowRDD.toDF("id","name","age") //查询前十行数据 personDF.show(10) //打印元数据信息 personDF.printSchema() //关闭资源 sc.stop() spark.stop() } }
运行结果
object Demo02 { def main(args: Array[String]): Unit = { //1.创建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext //设置日志级别 sc.setLogLevel("WARN") //2.读取文件 val fileRDD: RDD[String] = sc.textFile("in/tt.txt") val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" ")) // 将数据封装成ROW类型的 行数据 val rowRDD: RDD[Row] = linesRDD.map(line =>Row(line(0).toInt,line(1),line(2).toInt)) //3.将RDD转成DF //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换 //import spark.implicits._ //设置表的一个模式 // val schema: StructType = StructType(Seq( StructField("id", IntegerType, true), //允许为空 StructField("name", StringType, true), StructField("age", IntegerType, true)) ) val personDF: DataFrame = spark.createDataFrame(rowRDD,schema) // //查询前十行数据 personDF.show(10) //打印元数据信息 personDF.printSchema() //关闭资源 sc.stop() spark.stop() } }
运行结果
object Demo03 { // 定义一个样例类 case class Person(id:Int,name:String,age:Int) def main(args: Array[String]): Unit = { //1.创建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL") .getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.读取文件 val fileRDD: RDD[String] = sc.textFile("in/tt.txt") val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" ")) val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt)) //3.将RDD转成DF //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换 import spark.implicits._ //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息 //所以SparkSQL可以通过反射自动获取到并添加给DF val personDF: DataFrame = rowRDD.toDF personDF.show(10) personDF.printSchema() sc.stop() spark.stop() } }
运行结果
可以发现以上三种方法都可以成功创建DataFrame/DataSet,接下来讲解的是在利用SparkSQL花式查询数据。
object QueryDemo { case class Person(id:Int,name:String,age:Int) def main(args: Array[String]): Unit = { //1.创建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL") .getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.读取文件 val fileRDD: RDD[String] = sc.textFile("in/person.txt") val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" ")) val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt)) //3.将RDD转成DF //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换 import spark.implicits._ //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息 //所以SparkSQL可以通过反射自动获取到并添加给DF val personDF: DataFrame = rowRDD.toDF personDF.show(10) personDF.printSchema() //=======================SQL方式查询======================= //0.注册表 personDF.createOrReplaceTempView("t_person") //1.查询所有数据 spark.sql("select * from t_person").show() //2.查询age+1 spark.sql("select age,age+1 from t_person").show() //3.查询age最大的两人 spark.sql("select name,age from t_person order by age desc limit 2").show() //4.查询各个年龄的人数 spark.sql("select age,count(*) from t_person group by age").show() //5.查询年龄大于30的 spark.sql("select * from t_person where age > 30").show() //=======================DSL方式查询======================= //1.查询所有数据 personDF.select("name","age") //2.查询age+1 personDF.select($"name",$"age" + 1) //3.查询age最大的两人 personDF.sort($"age".desc).show(2) //4.查询各个年龄的人数 personDF.groupBy("age").count().show() //5.查询年龄大于30的 personDF.filter($"age" > 30).show() sc.stop() spark.stop() } }
查询出来的结果很多,博主就不贴长图出来了。感兴趣的朋友可以复制代码自行测试。
RDD、DF、DS之间的相互转换有很多(6种),但是我们实际操作就只有2类:
1)使用RDD算子操作
2)使用DSL/SQL对表操作
object TransformDemo { case class Person(id:Int,name:String,age:Int) def main(args: Array[String]): Unit = { //1.创建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.读取文件 val fileRDD: RDD[String] = sc.textFile("in/person.txt") val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" ")) val personRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt)) //3.将RDD转成DF //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换 import spark.implicits._ //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息 //所以SparkSQL可以通过反射自动获取到并添加给DF //=========================相互转换====================== //1.RDD-->DF val personDF: DataFrame = personRDD.toDF //2.DF-->RDD val rdd: RDD[Row] = personDF.rdd //3.RDD-->DS val DS: Dataset[Person] = personRDD.toDS() //4.DS-->RDD val rdd2: RDD[Person] = DS.rdd //5.DF-->DS val DS2: Dataset[Person] = personDF.as[Person] //6.DS-->DF val DF: DataFrame = DS2.toDF() sc.stop() spark.stop() } }
作为一个经典的案例,初学SparkSQL怎么能少得了WordCount的身影呢,下面为大家带来的就是使用SparkSQL完成WordCount的开发过程。同样,分为SQL风格和DSL风格~
words.txt
hadoop hadoop spark spark spark java java sqoop sqoop jdk jdk hive hive hive hbase hbase flume flume oozie oozie flink flink flink hello hello hello scala scala
object WordCount { def main(args: Array[String]): Unit = { //1.创建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.读取文件 val fileDF: DataFrame = spark.read.text("in/words.txt") val fileDS: Dataset[String] = spark.read.textFile("in/words.txt") //fileDF.show() //fileDS.show() //3.对每一行按照空格进行切分并压平 //fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是String import spark.implicits._ val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正确,因为DS有泛型,知道_是String //wordDS.show() /* +-----+ |value| +-----+ |hello| | me| |hello| | you| ... */ //4.对上面的数据进行WordCount wordDS.createOrReplaceTempView("t_word") val sql = """ |select value ,count(value) as count |from t_word |group by value |order by count desc """.stripMargin spark.sql(sql).show() sc.stop() spark.stop() } }
运行结果
object WordCount2 { def main(args: Array[String]): Unit = { //1.创建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.读取文件 val fileDF: DataFrame = spark.read.text("in/words.txt") val fileDS: Dataset[String] = spark.read.textFile("in/words.txt") //fileDF.show() //fileDS.show() //3.对每一行按照空格进行切分并压平 //fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是String import spark.implicits._ val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正确,因为DS有泛型,知道_是String //wordDS.show() /* +-----+ |value| +-----+ |hello| | me| |hello| | you| ... */ //4.对上面的数据进行WordCount wordDS.groupBy("value").count().orderBy($"count".desc).show() sc.stop() spark.stop() } }
运行结果
本次的分享就到这里了,关于SparkSQL最基础的内容就在这里了,受益或对大数据技术感兴趣的朋友记得点赞关注(^U^)ノ~YO 后续博主还会更SparkSQL一些进阶拓展的内容,敬请期待(✪ω✪)