Spark DataFrame支持所有基本SQL Join类型的操作,如INNER
,RIGHT OUTER
,LEFT ANTI
, LEFT SEMI
, CROSS
, SELF
JOIN. Spark SQL Join操作是宽转换操作,结果数据会重组在网络中,因此当不仔细设计时,会有非常大的性能问题.
另外一方面,Spark SQL Join操作默认带更多优化(多亏DataFrame & DataSet), 虽然这样,当使用时需要考虑仍有一些性能问题.
在本文中,你会学到不同的Join语法并使用不同的Join 类型在DataFrame和DataSet上,例子用Scala.
下边列出所有Spark SQL类型和语法
join(right: Dataset[_]): DataFrame join(right: Dataset[_], usingColumn: String): DataFrame join(right: Dataset[_], usingColumns: Seq[String]): DataFrame join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame join(right: Dataset[_], joinExprs: Column): DataFrame join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
剩下的教程解释6种语法 Join 类型,接收合适的DataFrame, Join表达式和Join类型用string.
对于语法4 & 5你可以使用"JoinType" 或者"Join 字符串"定义在上边的表用"JoinType"字符串参数.当你使用"JoinType",你应该import org.apache.spark.sql.catalyst.plans._
作为定义JoinType对象.
JOINTYPE | JOIN STRING | EQUIVALENT SQL JOIN |
---|---|---|
Inner.sql | inner | INNER JOIN |
FullOuter.sql | outer, full, fullouter, full_outer | FULL OUTER JOIN |
LeftOuter.sql | left, leftouter, left_outer | LEFT JOIN |
RightOuter.sql | right, rightouter, right_outer | RIGHT JOIN |
Cross.sql | cross | |
LeftAnti.sql | anti, leftanti, left_anti | |
LeftSemi.sql | semi, leftsemi, left_semi |
所有Join对象定义joinType对象为了使用你需要导入org.apache.spark.sql.catalyst.plans.{LeftOuter,Inner,....}
.
在你使用Spark SQL join例子之前,首先,先创建emp
和dept
DataFrame. 在这里emp_id 列在emp
是唯一的, dept_id 在 dept
数据集唯一, 并且在emp
中 emp_dept_id
指向dep
t数据集中dept_id
.
val emp = Seq((1,"Smith",-1,"2018","10","M",3000), (2,"Rose",1,"2010","20","M",4000), (3,"Williams",1,"2010","10","M",1000), (4,"Jones",2,"2005","10","F",2000), (5,"Brown",2,"2010","40","",-1), (6,"Brown",2,"2010","50","",-1) ) val empColumns = Seq("emp_id","name","superior_emp_id","year_joined", "emp_dept_id","gender","salary") import spark.sqlContext.implicits._ val empDF = emp.toDF(empColumns:_*) empDF.show(false) val dept = Seq(("Finance",10), ("Marketing",20), ("Sales",30), ("IT",40) ) val deptColumns = Seq("dept_name","dept_id") val deptDF = dept.toDF(deptColumns:_*) deptDF.show(false)
Emp Dataset +------+--------+---------------+-----------+-----------+------+------+ |emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary| +------+--------+---------------+-----------+-----------+------+------+ |1 |Smith |-1 |2018 |10 |M |3000 | |2 |Rose |1 |2010 |20 |M |4000 | |3 |Williams|1 |2010 |10 |M |1000 | |4 |Jones |2 |2005 |10 |F |2000 | |5 |Brown |2 |2010 |40 | |-1 | |6 |Brown |2 |2010 |50 | |-1 | +------+--------+---------------+-----------+-----------+------+------+ Dept Dataset +---------+-------+ |dept_name|dept_id| +---------+-------+ |Finance |10 | |Marketing|20 | |Sales |30 | |IT |40 | +---------+-------+
inner join为默认join,并且也是最常用的.被用来join两个DataFrame/Dataset 在指定列上,在两个数据集上没有匹配列上的数据会被丢弃.
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"inner") .show(false)
当应用inner join在数据集上时,会把emp 中emp_dept_id=50和 dept中dept_id=30的数据丢弃.下边是上边的输出结果
+------+--------+---------------+-----------+-----------+------+------+---------+-------+ |emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id| +------+--------+---------------+-----------+-----------+------+------+---------+-------+ |1 |Smith |-1 |2018 |10 |M |3000 |Finance |10 | |2 |Rose |1 |2010 |20 |M |4000 |Marketing|20 | |3 |Williams|1 |2010 |10 |M |1000 |Finance |10 | |4 |Jones |2 |2005 |10 |F |2000 |Finance |10 | |5 |Brown |2 |2010 |40 | |-1 |IT |40 | +------+--------+---------------+-----------+-----------+------+------+---------+-------+
outer
也叫full
, fullouter
join返回Spark DataFrame/Dataset中所有的行, join表达式没有匹配上的用null来表示对象列.
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"outer").show(false) empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"full").show(false) empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"fullouter").show(false)
Spark left
同 left outer
join返回所有在左边DataFrame/Dataset的所有列,忽略右边数据没有匹配上的数据,它被分配null.
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"left") .show(false) empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftouter") .show(false)
在empDF数据集emp_dept_id
=50没有记录在dept
中,因此数据集在dept
列(dept_name & dept_id)为null,并且dept_id
=30在dept
中会被丢掉.下边是上边Join表达式的结果.
+------+--------+---------------+-----------+-----------+------+------+---------+-------+ |emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id| +------+--------+---------------+-----------+-----------+------+------+---------+-------+ |1 |Smith |-1 |2018 |10 |M |3000 |Finance |10 | |2 |Rose |1 |2010 |20 |M |4000 |Marketing|20 | |3 |Williams|1 |2010 |10 |M |1000 |Finance |10 | |4 |Jones |2 |2005 |10 |F |2000 |Finance |10 | |5 |Brown |2 |2010 |40 | |-1 |IT |40 | |6 |Brown |2 |2010 |50 | |-1 |null |null | +------+--------+---------------+-----------+-----------+------+------+---------+-------+ Scala
Spark Right 同 Right Outer join 相对于left join是另外一方向,会返回所有右侧DataFrame/Dataset的行,并且忽略匹配在左侧数据集当没有匹配上时,会分区null为那些数据,并且丢掉左侧没有匹配上的数据.
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"right") .show(false) empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"rightouter") .show(false)
例子中,右侧数据集dep_id
=30没有在左侧数据集emp
中,因此这条记录包含null在emp
列中.并且,emp_dept_id
=50的没有匹配上,会被丢掉.下边是输出
+------+--------+---------------+-----------+-----------+------+------+---------+-------+ |emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id| +------+--------+---------------+-----------+-----------+------+------+---------+-------+ |4 |Jones |2 |2005 |10 |F |2000 |Finance |10 | |3 |Williams|1 |2010 |10 |M |1000 |Finance |10 | |1 |Smith |-1 |2018 |10 |M |3000 |Finance |10 | |2 |Rose |1 |2010 |20 |M |4000 |Marketing|20 | |null |null |null |null |null |null |null |Sales |30 | |5 |Brown |2 |2010 |40 | |-1 |IT |40 | +------+--------+---------------+-----------+-----------+------+------+---------+-------+
Spark left Semi 和inner join相似,不同点leftsemi join返回在左数据集中所有列,并且忽略右数据集中所有列.换句话说,这个join返回的万仅仅是左数据匹配上右数据集的,没有匹配上的无论左右都被忽略.
相同的结果可以获得通过使用select在这个结果上,用inner join, 然而,使用这个join更高效.
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"), "leftsemi") .show(false)
输出
leftsemi join +------+--------+---------------+-----------+-----------+------+------+ |emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary| +------+--------+---------------+-----------+-----------+------+------+ |1 |Smith |-1 |2018 |10 |M |3000 | |2 |Rose |1 |2010 |20 |M |4000 | |3 |Williams|1 |2010 |10 |M |1000 | |4 |Jones |2 |2005 |10 |F |2000 | |5 |Brown |2 |2010 |40 | |-1 | +------+--------+---------------+-----------+-----------+------+------+
left anti join 做的和spark leftsemi刚好相反, leftanti join 返回仅仅是左侧列没有匹配上的.
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftanti") .show(false)
输出
+------+-----+---------------+-----------+-----------+------+------+ |emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary| +------+-----+---------------+-----------+-----------+------+------+ |6 |Brown|2 |2010 |50 | |-1 | +------+-----+---------------+-----------+-----------+------+------+
Spark join没有自联接join是不完整的.虽然没有自连接join类型可用,我们可以使用任何上边解释的join类型 join到DataFrame自身.下边为一个inner self join
empDF.as("emp1").join(empDF.as("emp2"), col("emp1.superior_emp_id") === col("emp2.emp_id"),"inner") .select(col("emp1.emp_id"),col("emp1.name"), col("emp2.emp_id").as("superior_emp_id"), col("emp2.name").as("superior_emp_name")) .show(false)
我们可以为所有employees join emp数据集join自己来发现superior emp_id和名字
+------+--------+---------------+-----------------+ |emp_id|name |superior_emp_id|superior_emp_name| +------+--------+---------------+-----------------+ |2 |Rose |1 |Smith | |3 |Williams|1 |Smith | |4 |Jones |2 |Rose | |5 |Brown |2 |Rose | |6 |Brown |2 |Rose | +------+--------+---------------+-----------------+
因为Spark SQL支持SQL原生语法,我们可以写join操作在创建一个临时表之后,并使用spark.sql()
empDF.createOrReplaceTempView("EMP") deptDF.createOrReplaceTempView("DEPT") //SQL JOIN val joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id") joinDF.show(false) val joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id") joinDF2.show(false)
package com.sparkbyexamples.spark.dataframe.join import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.col object JoinExample extends App { val spark: SparkSession = SparkSession.builder() .master("local[1]") .appName("SparkByExamples.com") .getOrCreate() spark.sparkContext.setLogLevel("ERROR") val emp = Seq((1,"Smith",-1,"2018","10","M",3000), (2,"Rose",1,"2010","20","M",4000), (3,"Williams",1,"2010","10","M",1000), (4,"Jones",2,"2005","10","F",2000), (5,"Brown",2,"2010","40","",-1), (6,"Brown",2,"2010","50","",-1) ) val empColumns = Seq("emp_id","name","superior_emp_id","year_joined","emp_dept_id","gender","salary") import spark.sqlContext.implicits._ val empDF = emp.toDF(empColumns:_*) empDF.show(false) val dept = Seq(("Finance",10), ("Marketing",20), ("Sales",30), ("IT",40) ) val deptColumns = Seq("dept_name","dept_id") val deptDF = dept.toDF(deptColumns:_*) deptDF.show(false) println("Inner join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"inner") .show(false) println("Outer join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"outer") .show(false) println("full join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"full") .show(false) println("fullouter join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"fullouter") .show(false) println("right join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"right") .show(false) println("rightouter join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"rightouter") .show(false) println("left join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"left") .show(false) println("leftouter join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftouter") .show(false) println("leftanti join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftanti") .show(false) println("leftsemi join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftsemi") .show(false) println("cross join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"cross") .show(false) println("Using crossJoin()") empDF.crossJoin(deptDF).show(false) println("self join") empDF.as("emp1").join(empDF.as("emp2"), col("emp1.superior_emp_id") === col("emp2.emp_id"),"inner") .select(col("emp1.emp_id"),col("emp1.name"), col("emp2.emp_id").as("superior_emp_id"), col("emp2.name").as("superior_emp_name")) .show(false) empDF.createOrReplaceTempView("EMP") deptDF.createOrReplaceTempView("DEPT") //SQL JOIN val joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id") joinDF.show(false) val joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id") joinDF2.show(false) }
在本教程中,你已经学习了Spark SQL join 类型,INNER, LEFT OUTER, RIGHT OUTER, LEFT ANTI, LEFT SEMI, CROSS, SELF的使用,和对应SCALA代码.