Java教程

sparkSQL2.x的join

本文主要是介绍sparkSQL2.x的join,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

package cn.edu360.day8

import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * Created by zx on 2017/10/16.
  */
object JoinTest {

  def main(args: Array[String]): Unit = {


    val spark = SparkSession.builder().appName("CsvDataSource")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._
    //import org.apache.spark.sql.functions._
		
    //spark.sql.autoBroadcastJoinThreshold=-1
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    //spark.conf.set("spark.sql.join.preferSortMergeJoin", true)

    //println(spark.conf.get("spark.sql.autoBroadcastJoinThreshold"))

    val df1 = Seq(
      (0, "playing"),
      (1, "with"),
      (2, "join")
    ).toDF("id", "token")

    val df2 = Seq(
      (0, "P"),
      (1, "W"),
      (2, "S")
    ).toDF("aid", "atoken")


    df2.repartition()

    //df1.cache().count()

    val result: DataFrame = df1.join(df2, $"id" === $"aid")

    //查看执行计划
    result.explain()

    result.show()

    spark.stop()


  }

}

spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, -1)
Broadcast Join默认是10M -1代表不使用,可以修改-1的值

merge join 先进行每个表的排序,然后join
hashShuffle join 对数值取hash到不同分区进行join操作(默认)

这篇关于sparkSQL2.x的join的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!