首先根据模板创建一个scala项目
模板:
group:net.alchim31.maven
artifact: scala-archetype-simple
version: 1.7
repository:https://maven.aliyun.com/repository/central
<properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.11</scala.version> <scala.compat.version>2.11</scala.compat.version> <spec2.version>4.2.0</spec2.version> </properties> <!--scala依赖--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!--sparkcore依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.compat.version}</artifactId> <version>2.3.2</version> <scope>provided</scope> </dependency> <!--sparksql依赖--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.compat.version}</artifactId> <version>2.3.2</version> <scope>provided</scope> </dependency> <!--log4j--> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j</artifactId> <version>2.14.1</version> </dependency>
package com.antg.main import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} object DataFrames1_6 { def main(args: Array[String]): Unit = { //创建sparkconf val conf = new SparkConf() conf.setMaster("local") conf.setAppName("测试DF1.6") //创建上下文环境 val sc = new SparkContext(conf) //创建sql上下文 val sqlContext = new SQLContext(sc) //读取数据 val df = sqlContext.read.json("C:\\Users\\Administrator\\Desktop\\data.json") //显示全部信息 df.show() //关闭上下文 sc.stop() } }
package com.antg.main import org.apache.spark.sql.SparkSession object DataFrames2_3 { def main(args: Array[String]): Unit = { //创建session val sparkSession = SparkSession.builder() .master("local[*]") .appName("dataframes2.3") .getOrCreate() //创建df val df = sparkSession.read.json("C:\\Users\\Administrator\\Desktop\\data.json") //虚表 val vrTable = df.createTempView("vrTable") sparkSession.sql("select * from vrTable").show() //数据持久化 df.repartition(2).write.format("parquet").save("./data") //关闭 sparkSession.stop() } }
package com.antg.main import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{StringType, StructField, StructType} object RDD_DF { def main(args: Array[String]): Unit = { var sparkSession = SparkSession.builder() .appName("test_rdd to df") .master("local[*]") .getOrCreate() var scheme = StructType( "stdno name classId className".split(" ").map(t => StructField(t,StringType,true)) ) var lineRDD = sparkSession.sparkContext.textFile("C:\\Users\\Administrator\\Desktop\\student_mysql.txt") var rowRDD = lineRDD.map(_.split("\t")).map(row => Row(row(0),row(1),row(2),row(3))) var df = sparkSession.createDataFrame(rowRDD,scheme) df.show() df.printSchema() sparkSession.stop() } }
package com.antg.main import org.apache.spark.sql.SparkSession case class Student(name:String,age:BigInt) object TestDS { def main(args: Array[String]): Unit = { //创建Session val sparkSession = SparkSession.builder() .appName("ds test") .master("local[*]") .getOrCreate() //引入自动隐式转换 import sparkSession.implicits._ //使用基础数据类型创建DataSet val a = Seq(1,2,3).toDS() //使用DataSet a.map(_+1).collect.foreach(println) a.show() //使用样例类创建DS val b = Seq(Student("tom",22)).toDS() b.show() //通过导入文件创建,并使用样例类指定DS的格式 val path = "C:\\Users\\Administrator\\Desktop\\student_data.txt" val c = sparkSession.read.json(path).as[Student] c.show() //由于是强类型,所以这里可以很方便的操作ds中的内容 c.foreach(x=>println(x.age)) } }
student_data.txt
{"name":"张一","age":10,"address":"国际庄"} {"name":"张二","age":20} {"name":"张三","age":30} {"name":"张四","age":40}