JOIN操作是非常常见的数据处理操作,Spark作为一个统一的大数据处理引擎,提供了非常丰富的JOIN场景。本文分享将介绍Spark所提供的5种JOIN策略,希望对你有所帮助。本文主要包括以下内容:
数据集的大小
参与JOIN的数据集的大小会直接影响Join操作的执行效率。同样,也会影响JOIN机制的选择和JOIN的执行效率。
JOIN的条件
JOIN的条件会涉及字段之间的逻辑比较。根据JOIN的条件,JOIN可分为两大类:等值连接和非等值连接。等值连接会涉及一个或多个需要同时满足的相等条件。在两个输入数据集的属性之间应用每个等值条件。当使用其他运算符(运算连接符不为=)时,称之为非等值连接。
JOIN的类型
在输入数据集的记录之间应用连接条件之后,JOIN类型会影响JOIN操作的结果。主要有以下几种JOIN类型:
Spark提供了5种JOIN策略来执行具体的JOIN操作。该5种JOIN策略如下所示:
简介
当要JOIN的表数据量比较大时,可以选择Shuffle Hash Join。这样可以将大表进行按照JOIN的key进行重分区,保证每个相同的JOIN key都发送到同一个分区中。如下图示:
如上图所示:Shuffle Hash Join的基本步骤主要有以下两点:
简介
也称之为Map端JOIN。当有一张表较小时,我们通常选择Broadcast Hash Join,这样可以避免Shuffle带来的开销,从而提高性能。比如事实表与维表进行JOIN时,由于维表的数据通常会很小,所以可以使用Broadcast Hash Join将维表进行Broadcast。这样可以避免数据的Shuffle(在Spark中Shuffle操作是很耗时的),从而提高JOIN的效率。在进行 Broadcast Join 之前,Spark 需要把处于 Executor 端的数据先发送到 Driver 端,然后 Driver 端再把数据广播到 Executor 端。如果我们需要广播的数据比较多,会造成 Driver 端出现 OOM。具体如下图示:
Broadcast Hash Join主要包括两个阶段:
longMetric("dataSize") += dataSize if (dataSize >= (8L << 30)) { throw new SparkException(s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB") }
简介
该JOIN机制是Spark默认的,可以通过参数spark.sql.join.preferSortMergeJoin进行配置,默认是true,即优先使用Sort Merge Join。一般在两张大表进行JOIN时,使用该方式。Sort Merge Join可以减少集群中的数据传输,该方式不会先加载所有数据的到内存,然后进行hashjoin,但是在JOIN之前需要对join key进行排序。具体图示:
Sort Merge Join主要包括三个阶段:
简介
如果 Spark 中两张参与 Join 的表没指定join key(ON 条件)那么会产生 Cartesian product join,这个 Join 得到的结果其实就是两张行数的乘积。
条件
简介
该方式是在没有合适的JOIN机制可供选择时,最终会选择该种join策略。优先级为:Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join.
在Cartesian 与Broadcast Nested Loop Join之间,如果是内连接,或者非等值连接,则优先选择Broadcast Nested Loop策略,当时非等值连接并且一张表可以被广播时,会选择Cartesian Join。
条件与特点
等值连接的情况
有join提示(hints)的情况,按照下面的顺序
没有join提示(hints)的情况,则逐个对照下面的规则
有join提示(hints),按照下面的顺序
没有join提示(hints),则逐个对照下面的规则
join策略选择的源码片段
object JoinSelection extends Strategy with PredicateHelper with JoinSelectionHelper { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case j @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint) => def createBroadcastHashJoin(onlyLookingAtHint: Boolean) = { getBroadcastBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map { buildSide => Seq(joins.BroadcastHashJoinExec( leftKeys, rightKeys, joinType, buildSide, nonEquiCond, planLater(left), planLater(right))) } } def createShuffleHashJoin(onlyLookingAtHint: Boolean) = { getShuffleHashJoinBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map { buildSide => Seq(joins.ShuffledHashJoinExec( leftKeys, rightKeys, joinType, buildSide, nonEquiCond, planLater(left), planLater(right))) } } def createSortMergeJoin() = { if (RowOrdering.isOrderable(leftKeys)) { Some(Seq(joins.SortMergeJoinExec( leftKeys, rightKeys, joinType, nonEquiCond, planLater(left), planLater(right)))) } else { None } } def createCartesianProduct() = { if (joinType.isInstanceOf[InnerLike]) { Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), j.condition))) } else { None } } def createJoinWithoutHint() = { createBroadcastHashJoin(false) .orElse { if (!conf.preferSortMergeJoin) { createShuffleHashJoin(false) } else { None } } .orElse(createSortMergeJoin()) .orElse(createCartesianProduct()) .getOrElse { val buildSide = getSmallerSide(left, right) Seq(joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), buildSide, joinType, nonEquiCond)) } } createBroadcastHashJoin(true) .orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None } .orElse(createShuffleHashJoin(true)) .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None } .getOrElse(createJoinWithoutHint()) if (canBuildLeft(joinType)) BuildLeft else BuildRight } def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = { val maybeBuildSide = if (buildLeft && buildRight) { Some(desiredBuildSide) } else if (buildLeft) { Some(BuildLeft) } else if (buildRight) { Some(BuildRight) } else { None } maybeBuildSide.map { buildSide => Seq(joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), buildSide, joinType, condition)) } } def createCartesianProduct() = { if (joinType.isInstanceOf[InnerLike]) { Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition))) } else { None } } def createJoinWithoutHint() = { createBroadcastNLJoin(canBroadcastBySize(left, conf), canBroadcastBySize(right, conf)) .orElse(createCartesianProduct()) .getOrElse { Seq(joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), desiredBuildSide, joinType, condition)) } } createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint)) .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None } .getOrElse(createJoinWithoutHint()) case _ => Nil } }
总结
本文主要介绍了Spark提供的5种JOIN策略,并对三种比较重要的JOIN策略进行了图示解析。首先对影响JOIN的因素进行了梳理,然后介绍了5种Spark的JOIN策略,并对每种JOIN策略的具体含义和触发条件进行了阐述,最后给出了JOIN策略选择对应的源码片段。希望本文能够对你有所帮助。