Java教程

spark大佬总结

本文主要是介绍spark大佬总结,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

Spark概述

Hadoop小剧场

  • Hadoop1.x版本的问题

  • Hadoop2.x版本

Spark小剧场

  • 为什么使用函数式编程

什么是Spark

  • Spark是基于内存的快速、通用。可扩展的大数据分析引擎

Spark内置模块

  • 模块分区

    • Spark SQL 结构化数据 | Spark Streaming 实时计算
    • Spark Core
    • 独立调度器 | Yarn | Mesos
  • 模块解释

    • Spark Core

      • 实现了Spark的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。Spark Core中还包含了对弹性分布式数据集(Resilient Distributed DataSet,简称RDD)的API定义
    • Spark SQL

      • 是Spark用来操作结构化数据的程序包。通过Spark SQL,我们可以使用 SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。Spark SQL支持多种数据源,比如Hive表、Parquet以及JSON等
    • Spark Streaming

      • 是Spark提供的对实时数据进行流式计算的组件。提供了用来操作数据流的API,并且与Spark Core中的 RDD API高度对应
    • Spark MLlib

      • 提供常见的机器学习(ML)功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据 导入等额外的支持功能
    • Spark GraphX

      • GraphX是Spark面向图计算提供的框架与算法库
    • 集群管理器

      • Spark设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计算。为了实现这样的要求,同时获得最大灵活性,Spark支持在各种集群管理器(Cluster Manager)上运行,包括Hadoop YARN、Apache Mesos,以及Spark自带的一个简易调度器,叫作独立调度器。

Spark与Hadoop比较

  • Spark Task的启动时间快。Spark采用fork线程的方式,而Hadoop采用创建新的进程的方式
  • Spark只有在shuffle的时候将数据写入磁盘,而Hadoop中多个MR作业之间的数据交互都要依赖于磁盘交互
  • Spark的缓存机制比HDFS的缓存机制高效
  • Spark确实会比MapReduce更有优势 但是基于内存 所以在实际的生产环境中,由于内存的限制,可能会由于内存资源不够导致Job执行失败,此时,MapReduce其实是一个更好的选择,所以Spark并不能完全替代MR

Spark运行模式

运行模式

  • Local模式

    • 模式

      • local

        • 所有计算运行在一个线程中
      • local[K]

        • 指定使用几个线程来运行计算,通常CPU有几核,就指定几个线程
      • local[*]

        • 按CPU内部最多的Cores设置线程数
    • 基本语法

      • bin/spark-submit \
      • --class
      • --master \
      • --deploy-mode \
      • --conf = \
      • ... # other options
      • \
      • [application-arguments]
    • 参数说明

      • --master 指定Master的地址,默认为Local
      • --class: 你的应用的启动类 (如 org.apache.spark.examples.SparkPi)
      • --deploy-mode: 是否发布你的驱动到worker节点(cluster) 或者作为一个本地客户端 (client) (default: client)*
      • --conf: 任意的Spark配置属性, 格式key=value. 如果值包含空格,可以加引号“key=value”
      • application-jar: 打包好的应用jar,包含依赖. 这个URL在集群中全局可见。 比如hdfs:// 共享存储系统, 如果是 file:// path, 那么所有的节点的path都包含同样的jar
      • application-arguments: 传给main()方法的参数
      • --executor-memory 1G 指定每个executor可用内存为1G
      • --total-executor-cores 2 指定每个executor使用的cup核数为2个
  • Standalone模式

  • Yarn模式

  • Windows 模式

  • k8s模式

  • Mesos模式

端口号

  • Spark查看当前Spark-shell运行任务情况端口号:4040(计算)
  • Spark Master内部通信服务端口号:7077
  • Standalone模式下,Spark Master Web端口号:8080(资源)
  • Spark历史服务器端口号:18080
  • Hadoop YARN任务运行情况查看端口号:8088

运行架构

  • Driver(驱动器)

    • 概述

      • Spark的驱动器是执行开发程序中的main方法的进程。它负责开发人员编写的用来创建SparkContext、创建RDD,以及进行RDD的转化操作和行动操作代码的执行
      • Driver:任务的调度和切分
    • 功用

      • 把用户程序转为作业(JOB)
      • 跟踪Executor的运行状况
      • 为执行器节点调度任务
      • UI展示应用运行状况(4040)
  • Executor(执行器)

    • 概述

      • Spark Executor是一个JVM进程,负责在 Spark 作业中运行任务,任务间相互独立。Spark 应用启动时,Executor节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。
      • Executor:任务的执行
      • 在提交应用中,可以提供参数指定计算节点的个数,以及对应的资源。这里的资源一般指的是工作节点Executor的内存大小和使用的虚拟CPU核(Core)数量
    • 功用

      • 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程;
      • 通过自身的块管理器(Block Manager)为用户程序中要求缓存的RDD提供内存式存储。RDD是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。
  • 图示

  • 并行度

    • 整个集群并行执行任务的数量称之为并行度
  • 有向无环图(DAG)

    • tez 是作业跟作业之间的有向无环图
    • spark 是作业内部 有向无环图
  • 提交流程

    • Spark应用程序提交到Yarn环境中执行的时候,一般会有两种部署执行的方式:Client和Cluster。两种模式,主要区别在于:Driver程序的运行节点

    • Yarn Client模式

      • Client模式将用于监控和调度的Driver模块在客户端执行,而不是Yarn中,所以一般用于测试
      • Driver在任务提交的本地机器上运行
      • Driver启动后会和ResourceManager通讯申请启动ApplicationMaster
      • ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,负责向ResourceManager申请Executor内存
      • ResourceManager接到ApplicationMaster的资源申请后会分配container,然后ApplicationMaster在资源分配指定的NodeManager上启动Executor进程
      • Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数
      • 之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage,每个stage生成对应的TaskSet,之后将task分发到各个Executor上执行
    • Yarn Cluster模式

      • Cluster模式将用于监控和调度的Driver模块启动在Yarn集群资源中执行。一般应用于实际生产环境
      • 在YARN Cluster模式下,任务提交后会和ResourceManager通讯申请启动ApplicationMaster,
      • 随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver。
      • Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配container,然后在合适的NodeManager上启动Executor进程
      • Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数,
      • 之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage,每个stage生成对应的TaskSet,之后将task分发到各个Executor上执行

SparkCore

Spark三大数据结构

  • RDD

    • 弹性分布数据集
  • 累加器

    • 分布式共享只写数据
  • 广播变量

    • 分布式共享只读数据
  • 累加器和广播变量都不需要进行shuffle

RDD

  • RDD概述

    • 什么是RDD

      • Spark是一个分布式数据集的分析框架,将计算单元缩小为更适合分布式计算和并行计算的模型,称之为RDD

      • RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据(计算)抽象。

        • 弹性

          • 自动进行内存和磁盘数据存储的切换
          • 基于Lineage(血统)的高效容错机制
          • Task如果失败,自动进行特定次数的重试
          • Stage如果失败,自动进行特定次数的重试
          • checkpoint和persist,可主动或被动触发
          • 数据调度弹性:DAGScheduler、TaskScheduler与资源管理无关
          • 数据分片的高度弹性(coalesce)
        • 分布式

          • 数据的来源
        • 数据集

          • 数据的类型 & 计算逻辑的封装 (数据模型)
      • 代码中是一个抽象类,它代表一个不可变、可分区、里面的元素可并行计算的集合。

        • 不可变

          • 计算逻辑不可变
        • 可分区

          • 提高消费能力
        • 并行计算

          • 多任务同时执行
      • 图示

        • 一个抽象类 里面含有多个属性 用于对数据的归纳 和规则的存储 这就是数据模型
    • RDD核心属性

      • 分区列表

        • 一组分区(Partition)即数据集的基本组成单位
      • 分区计算函数

        • 一个计算各个分区间的函数
      • RDD之间的依赖关系

        • 一个有关于各个RDD间依赖关系的列表
      • 分区器

        • 一个关于键值key-value分片的Partitioner
      • 首选位置(可选)

        • 一个存储存取每个Partition的优先位置(preferred location)的列表

          • 优先位置是为了利于计算
    • RDD的特点

      • 分区

        • RDD逻辑上是分区的,每个分区的数据是抽象存在的

        • 计算的时候会通过一个compute函数得到每个分区的数据

          • 如果RDD是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据
          • 如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换
      • 只读

        • 要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD

          • RDD的操作算子包括两类,一类叫做transformations,它是用来将RDD进行转化,构建RDD的血缘关系
          • 一类叫做actions,它是用来触发RDD的计算,得到RDD的相关计算结果或者将RDD保存的文件系统中
      • 依赖

        • RDDs维护着操作算子转换的血缘关系,即依赖

        • 依赖的分类

          • 窄依赖

            • 一对一
          • 宽依赖

            • 多对多
        • 图示

      • 缓存

        • 如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,计算一次后进入缓存,就不再根据血缘关系计算
      • Checkpoint

        • 迭代会使RDDs之间的血缘关系变长,因此RDD引入checkpoint,将数据持久化存储,从而减轻血缘关系的依赖
  • RDD的创建

    • 从集合中创建RDD(内存)

      • val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8))
      • val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6,7,8))
    • 从外部的存储创建RDD(硬盘)

      • val rdd2= sc.textFile("hdfs://hadoop131:9000/RELEASE")
    • 从其他RDD创建(转换)

      • val mapPartitionsRDD: RDD[String] = textRDD.flatMap(_.split(" "))
  • RDD并行度与分区

    • val mkRDD: RDD[Int] = sc.makeRDD(List(5,6,7,8),2)

    • makeRDD的第一个参数:数据源

    • makeRDD的第二个参数:默认并行度(分区的数量)

    • 首先根据指定的并行度计算

    • 如果没有指定 并行度默认会从spark配置信息中获取spark.default.parallelism值

    • 如果获取不到指定参数,会采用默认值totalCores(机器的总核数)

    • 机器总核数 = 当前环境中可用核数

      • local => 单核(单线程)=> 1
      • local[4] => 4核(4个线程) => 4
      • local[*] => 最大核数 => 8
  • RDD的转换算子

    • Value类型

      • map(fuc)

        • 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
      • mapPartitions(fuc)

        • 类似于map,但独立地在RDD的每一个分片(分区)上运行.
      • map()和mapPartitions()的区别

        • map():每次处理一条数据
        • mapPartitions():每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才能释放,可能内存溢出导致OOM
        • 当内存空间较大的时候建议使用mapPartitions(),以提高处理效率
      • mapPartitionsWithIndex(fuc)

        • 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
      • flatMap(fuc)

        • 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
      • glom(配合flatmap可以合并分区)(fuc)

        • 将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
      • groupBy(fuc)

        • 分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器
        • val dataRDD = sparkContext.makeRDD(List(1,2,3,4),3)
        • dataRDD.groupBy(_ % 2,2).collect().foreach(println)
          
        • 将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中
        • 一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
      • filter(fuc)

        • 过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。
        • 将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。
        • 当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜
        • val dataRDD = sparkContext.makeRDD(List(
        •   1,2,3,4
          
        • ),1)
          
        • dataRDD.filter(_ > 2).collect().foreach(println)
          
      • sample(withReplacement, fraction, seed)

        • // 抽取数据不放回(伯努利算法)

        • // 伯努利算法:又叫0、1分布。例如扔硬币,要么正面,要么反面。

        • // 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要

        • // 第一个参数:抽取的数据是否放回,false:不放回

        • // 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;

        • // 第三个参数:随机数种子 确定结果一致性

        • val dataRDD1 = dataRDD.sample(false, 0.5)

        • // 抽取数据放回(泊松算法)

        • // 第一个参数:抽取的数据是否放回,true:放回;false:不放回

        • // 第二个参数:重复数据的几率,范围大于等于0.表示每一个元素被期望抽取到的次数

        • // 第三个参数:随机数种子 确定结果一致性

        • val dataRDD2 = dataRDD.sample(true, 2)

        • 用法

          • 在实际开发中,往往会出现数据倾斜的情况,那么可以从数据倾斜的分区中抽取数据,查看数据的规则,分析后,可以进行改善处理,让数据更加均匀

          • 伯努利算法中第二个参数 抽取概率的算法

            • 想要抽取的数据量/总的分区数
      • distinct([numTasks])

        • 对源RDD进行去重后返回一个新的RDD。默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它
      • coalesce(numPartitions)

        • 缩减分区数,用于大数据集过滤后,提高小数据集的执行效率
        • spark程序中,存在过多的小任务的时候,可以通过coalesce方法,收缩合并分区,减少分区的个数,减小任务调度成本
        • Coalesce方法默认情况下无法扩大分区,因为默认不会将数据打乱重新组合。扩大分区是没有意义。如果想要扩大分区,那么必须使用shuffle(第二个参数设置为 true),打乱数据,重新组合
      • repartition(numPartitions)

        • 底层是coalesce操作,参数shuffle的默认值为true
      • coalesce和repartition的区别

        • repartition方法其实就是coalesce方法,只不过肯定使用了shuffle操作。让数据更均衡一些,可以有效防止数据倾斜问题
        • 如果缩减分区,一般就采用coalesce, 如果扩大分区,就采用repartition
      • sortBy(func,[ascending], [numTasks])

        • 使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序
    • 双Value类型交互

      • union(otherDataset)

        • 并集

          • 类型不一致 会报错 并且分区也会并集
      • subtract (otherDataset)

        • 差集

          • 类型不一致 会报错 并且分区不变 得到的结果 数据shuffle重组
      • intersection(otherDataset)

        • 交集

          • 类型不一致 会报错 并且分区不变 得到的结果 数据shuffle重组
      • zip(otherDataset)

        • 拉链

          • 类型不一致 不会报错 但是分区数不一致和分区内数量不一致 会报错
    • Key-Value类型

      • partitionBy

        • 将数据按照指定Partitioner重新进行分区。Spark默认的分区器是HashPartitioner

        • 如果重分区的分区器和当前RDD的分区器一样怎么办?

          • 不进行任何的处理。不会再次重分区
          • 对pairRDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程
        • Spark还有其他分区器吗?

          • 有一个RangePartitioner,在sortBy中使用
        • 如果想按照自己的方法进行数据分区怎么办?

          • 自定义分区器
      • groupByKey

        • 将分区的数据直接转换为相同类型的内存数组进行后续处理
      • reduceByKey(func, [numTasks])

        • 可以将数据按照相同的Key对Value进行聚合
        • val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
        • val dataRDD2 = dataRDD1.reduceByKey(+)
        • val dataRDD3 = dataRDD1.reduceByKey(+, 2)
      • reduceByKey和groupByKey的区别

        • reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]
        • groupByKey:按照key进行分组,直接进行shuffle
        • 一般情况下reduceByKey比groupByKey更好,但要注意combine的使用条件必须是不影响最终的业务逻辑
      • aggregateByKey

        • 参数:(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)
        • 在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。
        • 即计算两次,一次分区内,一次分区外
      • foldByKey

        • aggregateByKey分区内和分区外计算规则相同可用foldByKey
      • combineByKey[C]

        • 类似于aggregate(),但是 combineByKey()允许用户返回值的类型与输入不一致
        • (createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
        • createCombiner
        • mergeValue
        • mergeCombiners
        • 将第一个key出现的v转换结构计算规则,第二个参数表示分区内计算规则,第三个参数表示分区间计算规则
      • sortByKey([ascending], [numTasks])

        • 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
      • mapValues

        • 针对于(K,V)形式的类型只对V进行操作
      • join(otherDataset, [numTasks])

        • 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
        • shuffle过程 性能不太高 能不用尽量不要用
      • leftOuterJoin

        • val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
        • val dataRDD2 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
        • val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2)
      • cogroup(otherDataset, [numTasks])

        • 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
        • join leftOuterJoin 底层都是调用的cogroup
  • RDD行动算子

    • reduce(func)

      • 通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据
    • collect()

      • 在驱动程序中,以数组的形式返回数据集的所有元素。
      • 把所有分区的结果 采集到当前节点的内存 容易 内存溢出 工作中 慎用
    • count()

      • 返回RDD中元素的个数
    • first()

      • 返回RDD中的第一个元素
    • take(n)

      • 返回一个由RDD的前n个元素组成的数组
    • takeOrdered(n)

      • 返回该RDD排序后的前n个元素组成的数组
    • aggregate

      • (zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)
      • aggregate 初始值 分区内 分区间都用的上 注意!!
    • fold(num)(func)

      • 折叠操作,aggregate的简化操作,seqop和combop一样
    • saveAsTextFile(path)

    • saveAsSequenceFile(path)

      • key-value 类型可以调用
    • saveAsObjectFile(path)

    • countByKey()

      • 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数
    • countByValue()

      • 底层也是调用的 countByKey()
    • foreach(func)

      • 在数据集的每一个元素上,运行函数func进行更新
    • 算子的逻辑代码在 executor 执行

    • 算子之外的代码在 driver 执行

  • RDD序列化

    • 在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要主要的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的

    • 函数的序列化 extends Serializable

    • kryo框架序列化

      • val conf: SparkConf = new SparkConf()
        .setAppName("SerDemo")
        .setMaster("local[*]")
        // 替换默认的序列化机制
        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        // 注册需要使用 kryo 序列化的自定义类
        .registerKryoClasses(Array(classOf[Searcher]))

      • case class Searcher(val query: String) {

    def isMatch(s: String) = {
    s.contains(query)
    }

    def getMatchedRDD1(rdd: RDD[String]) = {
    rdd.filter(isMatch)
    }

    def getMatchedRDD2(rdd: RDD[String]) = {
    val q = query
    rdd.filter(_.contains(q))
    }
    }

      - kryo 将不支持节点发送的情况 traint 不生效 java早期没有考虑到分布式传输的情况 还就得用kryo框架
      - 比java 序列化快10倍 显示的声明
    
    • 闭包检测

      • runjob之前有个clean方法 内部逻辑实现
  • RDD依赖关系

    • 血缘关系

      • 在大量记录上执行的单个操作,将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区

      • RDD的Lineage会记录RDD的元数据信息和转换行为

      • 如何查看Lineage

        • .toDebugString
      • 如何查看依赖类型

        • .dependencies
    • 窄依赖

      • 窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用
      • 一对一关系
    • 宽依赖

      • 宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,会引起shuffle
      • 多对多关系
    • DAG(Directed Acyclic Graph)

      • 原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据
      • 图示
    • 任务划分

      • 名词解释

        • Application

          • 初始化一个SparkContext即生成一个Application,Driver也可理解为Application
        • Job

          • 一个Action算子就会生成一个Job,只针对于Action算子
        • Stage

          • 根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage
        • Task

          • Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task
      • Application->Job->Stage-> Task每一层都是1对n的关系

  • RDD持久化

    • RDD缓存

      • RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中
    • RDD CheckPoint

      • 本质是通过将RDD写入Disk做检查点
      • 检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能
      • 在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除
      • 对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发
    • 缓存和检查点区别

      • Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低 不切断血缘依赖
      • Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高 切断血缘依赖
      • 建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD
  • RDD分区器

    • 概述

      • Hash

        • Hash分区为当前的默认分区
      • Range

      • Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数

      • 只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD分区器的值是None

    • 获取RDD分区

      • partitioner属性
    • Hash分区

      • 计算key的hashcode,与分区个数取余,小于0则加分区个数,大于0则加0
      • 弊端:导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据
    • Ranger分区

      • 将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,但是分区内的元素是不能保证顺序的。

      • 实现步骤

        • 从整个RDD中抽取样本并排序,计算每个分区最大key,形成数组变量
        • 判断key在rangeBounds中的范围,赋予id下标
    • 自定义分区

      • 继承Partitioner

      • 实现方法

        • numPartitions: Int:返回创建出来的分区数。

        • getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。

        • equals():Java

          • 判断相等性的标准方法
  • RDD文件读取与保存

    • 文件类数据读取与保存

      • Text文件

        • 读取textFile
        • 保存saveAsTextFile
      • Json文件

        • 使用RDD读取JSON文件处理很复杂,同时SparkSQL集成了很好的处理JSON文件的方式,所以应用中多是采用SparkSQL处理JSON文件
        • import scala.util.parsing.json.JSON
        • 读取textFile
        • 解析map
      • Csv文件

      • Sequence文件

        • Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)
        • SequenceFile文件只针对PairRDD
        • 读取sequenceFile()
        • 保存saveAsSequenceFile()
      • Object文件

        • 对象文件是将对象序列化后保存的文件,采用Java的序列化机制
        • 读取objectFile
        • 保存saveAsObjectFile
    • 文件系统类数据读取与保存

      • HDFS

        • 最抽象的两个函数接口

          • hadoopRDD

            • 输入格式(InputFormat)
            • 键类型
            • 值类型
            • 分区值
          • newHadoopRDD

            • 输入格式(InputFormat)
            • 键类型
            • 值类型
            • 分区值
        • 备注

          • 在Hadoop中以压缩形式存储的数据,不需要指定解压方式就能够进行读取

            • Hadoop本身有一个解压器会根据压缩文件的后缀推断解压算法进行解压.
          • map-reduce如何读取某一类型数据,将该对应读取方式改写为上述的两种接口

      • MySQL数据库

        • jdbcRDD

        • 添加依赖

          • <groupId>mysql</groupId>
            
          • <artifactId>mysql-connector-java</artifactId>
            
          • <version>5.1.27</version>
            
        • 代码详解

          • MySQL读取

            • // 定义连接mysql的参数
            • val driver = "com.mysql.jdbc.Driver"
            • val url = "jdbc:mysql://hadoop102:3306/rdd"
            • val userName = "root"
            • val passWd = "000000"
            • //创建JdbcRDD
            • val rdd = new JdbcRDD(sc, () => {
            •  Class.forName(driver)
              
            •  DriverManager.getConnection(url, userName, passWd)
              
            • },
            •  "select * from `rddtable` where `id`>=?;",
              
            •  1,
              
            •  10,
              
            •  1,
              
            •  r => (r.getInt(1), r.getString(2))
              
            • )
          • MySQL写入

            • def insertData(iterator: Iterator[String]): Unit = {
            • Class.forName ("com.mysql.jdbc.Driver").newInstance()
            • val conn = java.sql.DriverManager.getConnection("jdbc:mysql://hadoop102:3306/rdd", "root", "000000")
            • iterator.foreach(data => {
            • val ps = conn.prepareStatement("insert into rddtable(name) values (?)")
              
            • ps.setString(1, data) 
              
            • ps.executeUpdate()
              
            • })
            • }
      • HBase数据库

        • 概述

          • Spark 可以通过Hadoop输入格式访问HBase
        • 添加依赖

          • org.apache.hbase
          • hbase-server
          • 1.3.1
          • org.apache.hbase
          • hbase-client
          • 1.3.1
        • 代码详解

          • HBase读取

            • //构建HBase配置信息
            • val conf: Configuration = HBaseConfiguration.create()
              
            • conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104")
              
            • conf.set(TableInputFormat.INPUT_TABLE, "rddtable")
              
            • //从HBase读取数据形成RDD
            • val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
              
            •   conf,
              
            •   classOf[TableInputFormat],
              
            •   classOf[ImmutableBytesWritable],
              
            •   classOf[Result])
              
          • HBase写入

            • //创建HBaseConf
            • val conf = HBaseConfiguration.create()
            • val jobConf = new JobConf(conf)
            • jobConf.setOutputFormat(classOf[TableOutputFormat])
            • jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit_spark")
            • //构建Hbase表描述器
            • val fruitTable = TableName.valueOf("fruit_spark")
            • val tableDescr = new HTableDescriptor(fruitTable)
            • tableDescr.addFamily(new HColumnDescriptor("info".getBytes))
            • //创建Hbase表
            • val admin = new HBaseAdmin(conf)
            • if (admin.tableExists(fruitTable)) {
            • admin.disableTable(fruitTable)
              
            • admin.deleteTable(fruitTable)
              
            • }
            • admin.createTable(tableDescr)
            • //定义往Hbase插入数据的方法
            • def convert(triple: (Int, String, Int)) = {
            • val put = new Put(Bytes.toBytes(triple._1))
              
            • put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))
              
            • put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))
              
            • (new ImmutableBytesWritable, put)
              
            • }
            • //创建一个RDD
            • val initialRDD = sc.parallelize(List((1,"apple",11), (2,"banana",12), (3,"pear",13)))
            • //将RDD内容写到HBase
            • val localData = initialRDD.map(convert)
            • localData.saveAsHadoopDataset(jobConf)
            • }

累加器

  • 累加器就是在Driver程序中定义的变量 并且能让这个变量在Executor端的每个task都有一个变量副本,每个task更新这些副本的值后,传回Driver端进行merge

  • 系统累加器

    • val rdd = sc.makeRDD(List(1,2,3,4,5))
      // 声明累加器
      var sum = sc.longAccumulator("sum");
      rdd.foreach(
      num => {
      // 使用累加器
      sum.add(num)
      }
      )
      // 获取累加器的值
      println("sum = " + sum.value)
  • 自定义累加器

    • class MyAccount extends AccumulatorV2[String,mutable.Map[String, Int]] {

    var map : mutable.Map[String,Int] = mutable.Map()

    override def isZero: Boolean = map.isEmpty

    override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = new MyAccount

    override def reset(): Unit = map.clear()

    override def add(v: String): Unit = {
    map.update(v , map.getOrElseUpdate(v,0) + 1)
    }

    override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
    var map2 = other.value
    map = map.foldLeft(map2)((map,kv) => {
    map.update(kv._1, map.getOrElseUpdate(kv._1,0) + kv._2)
    map
    })

    }

    override def value: mutable.Map[String, Int] = map
    }

广播变量(调优策略)

  • 声明广播变量 会将数据广播到 每个Executor端的缓存中

  • 用法

    • val broadList: Broadcast[List[(String, Int)]] = sc.broadcast(list)
      rddDate.map(a => {
      var k = a._1
      var count1 = a._2
      var count2= 0
      for((w,v) <- broadList.value) {
      if(w == k) {
      count2 += v
      }
      }
      (k,count1+count2)
      }).collect().foreach(println)

Spark SQL

Spark SQL概述

  • 定义

    • Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame和DataSet,将Spark SQL转换成RDD,然后提交到集群执行

    • 什么是结构化数据

      • 能够用数据或统一的结构(二维表结构)加以表示,我们称之为结构化数据,如数字、符号
    • 什么是非结构化数据

      • 无法用数字或统一的结构表示,如文本、图像、声音、网页等,我们称之为非结构化数据
  • 特点

    • 易整合
    • 统一的数据访问方式
    • 兼容Hive
    • 标准的数据连接
  • DataFrame

    • 定义

      • DataFrame是一个分布式数据容器,除了数据以外,还记录数据的结构信息,即schema,也支持嵌套数据类型(struct、array和map)

      • DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待

      • DataFrame也是懒执行的。性能上比RDD要高,主要原因:

        • 优化的执行计划:查询计划通过Spark catalyst optimiser进行优化

          • 逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程
    • 与RDD的关系图

  • DataSet

    • DataSet比DataFrame多展示了数据的类型(强类型)

SparkSQL编程

  • SparkSession

    • SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合
    • val sparkConf = new SparkConf().setMaster("local[*]").setAppName("test")
    • val spark : SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
      
  • Session范围

    • 注意:普通临时表是Session范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.people
    • df.createGlobalTempView("people")
    • spark.sql("SELECT * FROM global_temp.people").show()
    • spark.newSession().sql("SELECT * FROM global_temp.people").show()
  • DSL语法

    • 注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名

    • df.select($"username",$"age" + 1).show

    • df.select('username, 'age + 1).show()

      • df.select('username, 'age + 1 as "newage").show()
      • df.select('username, 'age + 1).where('age > 10).show()
    • df.filter($"age">30).show

    • df.groupBy("age").count.show

  • RDD、DataFrame、DataSet

    • RDD - DataFrame

      • RDD -> DataFrame

        • // RDD转换为DF,DS时,需要增加隐式转换,需要引入spark环境对象的隐式转换规则
        •     import spark.implicits._
          
        •     val df = rdd.toDF("id","name","age")
          
      • DataFrame -> RDD

        • val rdd1 = df.rdd
    • RDD DataSet

      • RDD -> DataSet

        • val userRDD = rdd.map {
        •         case (id, name, age) => {
          
        •             User(id, name, age)
          
        •         }
          
        •     }
          
        •     val userDS = userRDD.toDS()
          
      • DataSet -> RDD

        • val rdd3 = userDS.rdd
    • DataFrame - DataSet

      • DataFrame -> DataSet

        • val ds = df.as[User]
      • DataSet -> DataFrame

        • val df1 = ds.toDF()
    • 图示

    • 共性与差异

      • 共性

        • 三者均是Spark平台下的分布式弹性数据集
        • 都有惰性机制,Action时才运算
        • 根据内存,自动缓存运算
        • 均有partition概念
        • import spark.implicits._,需要导入此包支持操作
      • 差异

        • RDD不支持SparkSQL
        • 与RDD和Dataset不同,DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值
        • Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同
  • 用户自定义函数

    • 弱类型

      • val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List(
        (10, "zhangsan", 1),
        (20, "lisi", 3),
        (10, "lisi", 3),
        (18, "wangwu", 5)
        ))
        rdd.collect()
        val frame = rdd.toDF("age","name","id")
        frame.createOrReplaceTempView("test")

    //定义用户自定义聚合函数
    val udaf = new MyavgUDAF
    spark.udf.register("avg" , udaf)
    spark.sql("select id,avg(age) from test group by id").show()
    - class MyavgUDAF extends UserDefinedAggregateFunction{
    //TODO 输入类型
    override def inputSchema: StructType = {
    StructType(Array(StructField("age", LongType)))
    }
    //TODO 缓冲区操作类型
    override def bufferSchema: StructType = {
    StructType(Array(StructField("totalAges", LongType),
    StructField("count", LongType)
    ))
    }
    //TODO 返回值 类型
    override def dataType: DataType = LongType

    //TODO 函数的稳定性
    override def deterministic: Boolean = true

    //TODO 函数缓冲区初始化
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0L
    }
    //TODO 更新缓冲区
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getLong(0) + input.getLong(0)
    buffer(1) = buffer.getLong(1) + 1
    }
    //TODO 合并缓冲区
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
    }
    //TODO 最终返回值
    override def evaluate(buffer: Row): Any = {
    buffer.getLong(0) / buffer.getLong(1)
    }
    }

    • 强类型

      • val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List(
        (10, "zhangsan", 1),
        (20, "lisi", 3),
        (10, "lisi", 3),
        (18, "wangwu", 5)
        ))
        rdd.collect()
        val frame = rdd.toDF("age","name","id")
        frame.createOrReplaceTempView("test")

    val udaf = new MyavgUDAF
    val ds: Dataset[User] = frame.as[User]
    ds.select(udaf.toColumn).show()
    - case class User(
    age:Long,
    name:String,
    id:Int
    )
    case class avgBuffle (
    var totalage : Long,
    var totalCount : Long
    )
    class MyavgUDAF extends Aggregator[User,avgBuffle,Long] {
    //初始化
    override def zero: avgBuffle = {
    avgBuffle(0L,0L)
    }
    //更新
    override def reduce(b: avgBuffle, a: User): avgBuffle = {
    b.totalage = b.totalage + a.age
    b.totalCount = b.totalCount + 1
    b
    }
    //合并
    override def merge(b1: avgBuffle, b2: avgBuffle): avgBuffle = {
    b1.totalage += b2.totalage
    b1.totalCount += b2.totalCount
    b1
    }
    //返回值
    override def finish(reduction: avgBuffle): Long = {
    reduction.totalage / reduction.totalCount
    }

    //DataSet默认额编解码器,用于序列化,固定写法
    //自定义类型就是produce 自带类型根据类型选择
    override def bufferEncoder: Encoder[avgBuffle] = Encoders.product

    override def outputEncoder: Encoder[Long] = Encoders.scalaLong
    }

SparkSQL数据源

  • 通用加载/保存方法

    • 手动指定选项

      • Spark SQL的默认数据源为Parquet格式

        • 修改配置项spark.sql.sources.default,可修改默认数据源格式
        • 当数据源格式不是parquet格式文件时,需要手动指定数据源的格式。数据源格式需要指定全名(例如:org.apache.spark.sql.parquet)
        • 如果数据源格式为内置格式,则只需要指定简称定json, parquet, jdbc, orc, libsvm, csv, text来指定数据的格式
      • spark.read.format("json").load

      • peopleDF.write.format("parquet").save

    • 文件保存策略mode()

      • SaveMode.ErrorIfExists(default)
      • SaveMode.Append(有记录 则追加)
      • SaveMode.Overwrite(有记录 则覆盖)
      • SaveMode.Ignore(有记录 则忽略)
    • spark.sql("select * from json.'input/user.json' ")

  • JSON文件

    • Spark SQL 能够自动推测 JSON数据集的结构,并将它加载为一个Dataset[Row].
    • 可以通过SparkSession.read.json()去加载一个 一个JSON 文件
    • spark.read.json
  • Parquet文件

    • Parquet是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录
    • Spark SQL 提供了直接读取和存储 Parquet 格式文件的方法
    • spark.read.parquet
  • CSV文件

    • spark.read.format("csv")
      .option("sep", ";")
      .option("inferSchema", "true")
      .option("header", "true")
      .load("input/people.csv").show()
  • JDBC

    • mysql mysql-connector-java 5.1.27
    • val df = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://hadoop102:3306/mydb")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "123456")
      .option("dbtable", "mytb1")
      .load()
    • df.write.format("jdbc")
      .option("url", "jdbc:mysql://hadoop102:3306/mydb")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "123456")
      .option("dbtable", "mytb3")
      .save()
  • Hive数据库

    • 内嵌Hive使用

      • org.apache.spark spark-hive_2.12 2.4.5
      org.apache.hive hive-exec 3.1.2 - spark.sql("create table test(id int)") - spark.sql("show tables").show() - spark.sql("load data local inpath 'input/word.txt' into table test ") - spark.sql("select * from test").show() - 会发现项目根目录出现两个hive 元数据文件
    • 外部Hive应用

      • hive-site.xml 添加到resource目录中 即可直接访问外部hive

SparkSQL实战

SparkStreaming

Spark Streaming概述

  • 定义

    • Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而DStream是由这些RDD所组成的序列
    • 微批次 准实时的流式处理
  • 特点

    • 易用
    • 容错
    • 易整合
  • 架构

DStream入门

  • Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据。

背压机制

DStream创建

  • 文件数据源

    • 文件数据流:能够读取所有HDFS API兼容的文件系统文件,通过fileStream方法进行读取

    • 注意事项

      • 文件需要有相同的数据格式;
      • 文件进入 dataDirectory的方式需要通过移动或者重命名来实现;
      • 一旦文件移动进目录,则不能再修改,即便修改了也不会读取新数据;
    • val dirDS = ssc.textFileStream("in")

  • RDD队列

    • ssc.queueStream(queueOfRDDs)

    • 用法

      • //3.创建RDD队列
      • val rddQueue = new mutable.Queue[RDD[Int]]()
        
      • //4.创建QueueInputDStream
      • val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)
        
      • //8.循环创建并向RDD队列中放入RDD
      • for (i <- 1 to 5) {
        
      •   rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
        
      •   Thread.sleep(2000)
        
      • }
        
  • 从远程端口采集数据

    • ssc.socketTextStream("hadoop102",8888).print()
  • 自定义数据源

    • 用法

      • //3.创建自定义receiver的Streaming
      • ssc.receiverStream(new MyReceiver("hadoop102",9999)).print()
      • class MyReceiver(host:String, port:Int) extends ReceiverString {

    var socket: Socket = _

    def receive(): Unit = {
    val reader: BufferedReader = new BufferedReader(
    new InputStreamReader(
    socket.getInputStream,
    "UTF-8"
    )
    )
    while (true) {
    val str = reader.readLine()
    if(str != null) {
    store(str)
    }
    }
    }

    override def onStart(): Unit = {
    socket = new Socket(host, port)
    new Thread("Socket Receiver") {
    setDaemon(true)
    override def run() { receive() }
    }.start()
    }
    override def onStop(): Unit = {
    socket.close()
    socket = null
    }
    }

  • Kafka数据源

    • 在工程中需要引入 Maven 工件 spark- streaming-kafka_2.10 来使用它

    • 用法

      • 导入依赖

        • <groupId>org.apache.spark</groupId>
          
        • <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
          
        • <version>2.1.1</version>
          
        • <groupId>org.apache.kafka</groupId>
          
        • <artifactId>kafka-clients</artifactId>
          
        • <version>0.11.0.2</version>
          
      • val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount1")
        val ssc = new StreamingContext(sparkConf, Seconds(3))

    val kafkaPara: Map[String, Object] = Map[String, Object](
    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
    ConsumerConfig.GROUP_ID_CONFIG -> "test",
    "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
    "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
    ssc,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe[String, String](Set("test"), kafkaPara))

    //5.将每条消息的KV取出
    val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())

    //6.计算WordCount
    valueDStream.flatMap(.split(" "))
    .map((
    , 1))
    .reduceByKey(_ + _)
    .print()

    //7.开启任务
    ssc.start()
    ssc.awaitTermination()

DStream转换

  • 无状态

    • transform

      • val re: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",8888)

    //TODO code Driver(1)
    re.transform(rdd => {
    //TODO code Driver(n)
    println("ssssssssssssssssssssss")
    rdd.map(a => {
    //TODO code Executor
    a*2
    })
    }).print()

    • join

      • 两个流之间的join需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的RDD进行join,与两个RDD的join效果相同
  • 有状态

    • updateStateByKey

      • ssc.sparkContext.setCheckpointDir("c1")

    val re: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",8888)

    re.flatMap(.split(" ")).map((,1))
    .updateStateByKey[Long](
    (seq : Seq[Int],op:Option[Long]) => {
    Option(op.getOrElse(0L) + seq.sum)
    }
    )

      - seq 相同key下 数量的集合
      - 类似于缓冲区 buffer 需要落盘 提前声明 setCheckpointDir
    
    • window

      • val re: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",8888)

    re.flatMap(.split(" ")).map((,1)).window(Seconds(9),Seconds(6)).reduceByKey(+).print()
    - 计算周期 == 滑动步长 Seconds(6)

    • countByWindow(windowLength, slideInterval)

      • 返回一个滑动窗口计数流中的元素个数
    • reduceByWindow(func, windowLength, slideInterval)

      • 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流
    • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

      • 当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值
    • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

      • 上述函数的变化版本,正向 逆向都支持

DStream输出

  • print()

  • saveAsTextFiles

  • saveAsObjectFiles

  • saveAsHadoopFiles

  • foreachRDD(func)

    • 这是最通用的输出操作 它用来对DStream中的RDD运行任意计算

    • 常见的用例之一是把数据写到诸如MySQL的外部数据库中

    • 注意

        1. 连接不能写在driver层面(序列化)
        1. 如果写在foreach则每个RDD中的每一条数据都创建,得不偿失;
        1. 增加foreachPartition,在分区创建(获取)

优雅关闭

  • //新启一个线程进行优雅的关闭
    new Thread(new Runnable {
    override def run(): Unit = {
    //依赖第三方 不能通过业务标识进行关闭
    //mysql
    //redis
    //zk
    //kafka
    //hdfs
    while (true) {
    val state: StreamingContextState = ssc.getState()
    //关闭前 进行判断
    if(state == StreamingContextState.ACTIVE) {
    ssc.stop(true,true)
    System.exit(0)//正常关闭JVM
    }
    }
    }
    }).start()

Spark内核解析

1. Spark应用提交

  • (1)Spark向Yarn提交
  • 当使用bin/java执行java程序时,会产生JVM,java的进程
  • (2)ApplicationMaster, Driver, Executor
  • ApplicationMaster是一个进程
  • Driver是一个线程, 但是我们一般会讲SparkContext称之为Driver
  • Executor是一个计算对象, 但有时我们将ExecutorBackend后台通信对象也称之为Executor

2. Spark内部组件及通信

  • (1)通信原理 - IO - RPC
  • 基本的网络通信:Socket, ServerSocket
  • 通信框架:AKKA(旧), Netty(新)(AIO)
  • 三种IO方式:BIO(阻塞式), NIO(非阻塞式), AIO(异步)
  • Linux, windows
  • 在Linux系统上,AIO的底层实现仍使用EPOLL,与NIO相同,因此在性能上没有明显的优势;Windows的AIO底层实现良好,但是Netty开发人员并没有把Windows作为主要使用平台考虑。微软的windows系统提供了一种异步IO技术:IOCP(I/O CompletionPort,I/O完成端口);Linux下由于没有这种异步IO技术,所以使用的是epoll(一种多路复用IO技术的实现)对异步IO进行模拟。所以在Linux上不建议使用AIO
  • (2)组件内部
  • Driver - CoarseGrainedSchedulerBackend - DriverEndpoint
  • Executor - CoarseGrainedExecutorBackend
  • (3)组件之间
  • 通信终端共通类:Endpoint
  • 通信终端:RpcEndpoint(receive)
  • 通信终端引用:RpcEndpointRef(send,ask)

3. Spark作业的提交(调度)

  • (1)Application
  • Yarn中会有Application
  • SparkConf中配置setAppName(“xxxxx”)
  • SparkContext
  • (2)逻辑代码 => RDD
  • RDD的创建
  • RDD的转换
  • RDD的行动
  • (3)Job => 行动算子(N)
  • 触发作业的执行:sc.runJob
  • new ActiveJob
  • (4)Stage的划分 => 转换算子 => 依赖关系
  • 窄依赖,宽依赖
  • 分区的数量
  • 默认不变
  • Shuffle的算子一般都会有改变分区数量的参数
  • 默认分区的数量
  • Local[*]
  • (5)Task的切分(Partition)
  • 任务和阶段的关系
  • 阶段数量 = ResultStage + N * ShuffleDep
  • 阶段类型:ResultStage, ShuffleMapStage
  • 任务和分区的关系
  • 任务的数量 : 当前阶段的分区的数量
  • 任务的总数量
  • 任务的总数量 : 所有阶段的任务总和
  • 任务的类型
  • ShuffleMapState => ShuffleMapTask
  • ResultStage => ResultTask

4. 任务的执行

  • (1)内容

  • 阶段ID

  • RDD元数据,依赖

  • 分区

  • 首选位置

  • (2)序列化(累加器,KRYO)

  • 默认序列化:Java序列化

  • Kryo序列化:shuffle的场合下,如果类型为值类型或字符串类型

  • (3)调度

  • Driver

  • TaskSet - Stage

  • TaskSetManager - Scheduler

  • 调度模式:FIFO(默认), FAIR

  • 任务调度池:Pool(FIFO)

  • Executor

  • 本地化级别(调度)

  • 降级处理:默认等待3s

  • (4)计算

  • 任务的编码

  • 向Executor发送Task

  • Executor接收Task,并进行解码

  • 计算对象执行Task

  • 线程池执行task.run方法, 调用具体Task对象的runTask方法。

  • (5)Shuffle

  • Shuffle map(Write)

  • Shuffle reduce(Read)

  • 管理器:SortShuffleManager

  • ShuffleWriter

  • BypassMergeSortShuffleWriter

  • UnsafeShuffleWriter

  • SortShuffleWriter

  • 写磁盘文件时,首先进行内存中的排序,如果内存(5M)不够,会溢写磁盘,生成临时文件,最终将所有的临时文件合并成数据文件和索引文件。

  • 预聚和的原理:在排序时,构造了一种类似于hashtable的结构,所以想同的key就聚合在一起。

    • 排序规则:首先会按照分区进行排序,然后按照key.hashCode进行排序
  • ShuffleHandle

  • BypassMergeSortShuffleHandle: 没有预聚和功能的算子 & reduce的分区数量 <=阈值(200)

  • SerializedShuffleHandle :Spark的内存优化后的解决方案,对象序列化后不需要反序列化。!(支持序列化重定位 & 支持预聚和 & 分区数量 大于阈值)

  • BaseShuffleHandle :默认的shuffleHandle

  • (6)内存(cache)

    • 静态内存管理

    • 统一内存管理

  • (7)累加器

内存管理

  • Spark 1.6 之前使用的是静态内存管理 (StaticMemoryManager) 机制,StaticMemoryManager 也是 Spark 1.6 之前唯一的内存管理器。

  • 在 Spark1.6 之后引入了统一内存管理 (UnifiedMemoryManager) 机制,UnifiedMemoryManager 是 Spark 1.6 之后默认的内存管理器

  • 1.6 之前采用的静态管理(StaticMemoryManager)方式仍被保留,可通过配置 spark.memory.useLegacyMode 参数启用

  • 静态内存管理

  • 统一内存管理

总结

假如topic数据已经不均匀如何做呢?

  • repartition+去掉数据本地性,可以稍微优化

周期性清除Spark Streaming流状态的方法 ?

  • 1.用crontab、Azkaban等在凌晨0点调度执行下面的Shell脚本
  • 2.给StreamingContext设置超时 ==> ssc.awaitTerminationOrTimeout(msTillTomorrow)

要想进行相同数据归类到相同分区,肯定要有产生shuffle步骤 ==> 自定义分区器

Spark SQL用UDF实现按列特征重分区?

  • 1.通过 Dataset repartition方法 udf() 根据表达式 对列进行分区操作
  • 2.通过sql 的group by + udf() 对列进行分区操作

Adaptive Execution如何让Spark SQL更高效更好用?

  • (英特尔大数据技术团队和百度大数据基础架构部工程师在Spark 社区版本的基础上,改进并实现的自适应执行引擎)
  • 原有 Shuffle 的问题:
  • Partition 个数不宜设置过大;
  • Reducer(代指 Spark Shuffle 过程中执行 Shuffle Read 的 Task) 个数过多,每个 Reducer 处理的数据量过小。大量小 Task 造成不必要的 Task 调度开销与可能的资源调度开销(如果开启了 Dynamic Allocation);
  • Reducer 个数过大,如果 Reducer 直接写 HDFS 会生成大量小文件,从而造成大量 addBlock RPC,Name node 可能成为瓶颈,并影响其它使用 HDFS 的应用;
  • 过多 Reducer 写小文件,会造成后面读取这些小文件时产生大量 getBlock RPC,对 Name node 产生冲击;
  • Partition 个数不宜设置过小
  • 每个 Reducer 处理的数据量太大,Spill 到磁盘开销增大;
  • Reducer GC 时间增长;
  • Reducer 如果写 HDFS,每个 Reducer 写入数据量较大,无法充分发挥并行处理优势;
  • 很难保证所有 Shuffle 都最优
  • 不同的 Shuffle 对应的数据量不一样,因此最优的 Partition 个数也不一样。使用统一的 Partition 个数很难保证所有 Shuffle 都最优;
  • 定时任务不同时段数据量不一样,相同的 Partition 数设置无法保证所有时间段执行时都最优;
  • 自动设置 Shuffle Partition 原理
  • 使用与优化方法:
  • 可通过spark.sql.adaptive.enabled=true启用 Adaptive Execution 从而启用自动设置 Shuffle Reducer 这一特性。
  • 通过spark.sql.adaptive.shuffle.targetPostShuffleInputSize可设置每个 Reducer 读取的目标数据量,其单位是字节,默认值为 64 MB。上文例子中,如果将该值设置为 50 MB,最终效果仍然如上文所示,而不会将 Partition 0 的 60MB 拆分
  • 动态调整执行计划:
  • SortMergeJoin 原理:
  • MergeSort 的同时,使用 SortMergeJoin 对二者进行 Join
  • BroadcastJoin 原理:
  • 当参与 Join 的一方足够小,可全部置于 Executor 内存中时,可使用 Broadcast 机制将整个 RDD 数据广播到每一个 Executor 中,该 Executor 上运行的所有 Task 皆可直接读取其数据。
  • 使用与优化方法:
  • 该特性的使用方式如下:
  • 当spark.sql.adaptive.enabled与spark.sql.adaptive.join.enabled都设置为true时,开启 Adaptive Execution 的动态调整 Join 功能;
  • spark.sql.adaptiveBroadcastJoinThreshold设置了 SortMergeJoin 转 BroadcastJoin 的阈值。如果不设置该参数,该阈值与spark.sql.autoBroadcastJoinThreshold的值相等;
  • 除了本文所述 SortMergeJoin 转 BroadcastJoin,Adaptive Execution 还可提供其它 Join 优化策略。部分优化策略可能会需要增加 Shuffle。spark.sql.adaptive.allowAdditionalShuffle参数决定了是否允许为了优化 Join 而增加 Shuffle。其默认值为 false。
  • 自动处理数据倾斜:
  • 解决数据倾斜典型方案:
  • 保证文件可 Split 从而避免读 HDFS 时数据倾斜;
  • 保证 Kafka 各 Partition 数据均衡从而避免读 Kafka 引起的数据倾斜;
  • 调整并行度或自定义 Partitioner 从而分散分配给同一 Task 的大量不同 Key;
  • 使用 BroadcastJoin 代替 ReduceJoin 消除 Shuffle 从而避免 Shuffle 引起的数据倾斜;
  • 对倾斜 Key 使用随机前缀或后缀从而分散大量倾斜 Key,同时将参与 Join 的小表扩容,从而保证 Join 结果的正确性。
  • 自动解决数据倾斜:
  • 使用与优化方法:
  • 将spark.sql.adaptive.skewedJoin.enabled设置为 true 即可自动处理 Join 时数据倾斜;
  • spark.sql.adaptive.skewedPartitionMaxSplits控制处理一个倾斜 Partition 的 Task 个数上限,默认值为 5;
  • spark.sql.adaptive.skewedPartitionRowCountThreshold设置了一个 Partition 被视为倾斜 Partition 的行数下限,也即行数低于该值的 Partition 不会被当作倾斜 Partition 处理。其默认值为 10L * 1000 * 1000 即一千万;
  • spark.sql.adaptive.skewedPartitionSizeThreshold设置了一个 Partition 被视为倾斜 Partition 的大小下限,也即大小小于该值的 Partition 不会被视作倾斜 Partition。其默认值为 64 * 1024 * 1024 也即 64MB;
  • spark.sql.adaptive.skewedPartitionFactor该参数设置了倾斜因子。如果一个 Partition 的大小大于spark.sql.adaptive.skewedPartitionSizeThreshold的同时大于各 Partition 大小中位数与该因子的乘积,或者行数大于spark.sql.adaptive.skewedPartitionRowCountThreshold的同时大于各 Partition 行数中位数与该因子的乘积,则它会被视为倾斜的 Partition。

并不是所有的谓词下推都是好的 也得看下推前 下推后是不是执行逻辑一致 否则结果不一致

  • spark.sql.parquet.filterPushdown 默认是true。设置为true代表开启parquet下推执行优化。
  • spark.sql.parquet.compression.codec 默认是snappy。当写parquet文件的时候设置压缩格式

spark是粗粒度的资源调用,当前运行的Application已经得到了运行所需要的全部资源 不会被外部影响

cache checkpoint 区别?

  • 两者都有存在的意义 首先RDD是高容错的 计算失败 会根据血缘关系重新计算

  • 由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition

  • 由于重新计算体量太大 有时候影响性能,所以有了cache 持久化机制 可以将shuffle的数据 持久化到内存和磁盘等位置。

  • 但是由于cache 是把数据持久化到内存和磁盘 可靠性并不高 所以有个checkpoint机制 可以支持数据持久化到可靠性高的外部存储 比如HDFS等 支持副本。

  • 所以本质上

    • cache 不切断血缘关系 存储在内存和磁盘 可靠性低。
    • checkpoint 切断血缘关系 可靠性高。

Spark 分析ES的数据,生成的RDD分区数跟什么有关系呢?

  • 默认情况下是一个es 索引分片对应Spark RDD的一个分区

为啥spark 的broadcast要用单例模式?

  • 1.首先Spark Streaming默认job数是1 默认FIFO调度(容易产生如下问题)

  • Spark Streaming 的job生成是周期性的。当前job的执行时间超过生成周期就会产生job 累加。累加一定数目的job后有可能会导致应用程序失败。这个主要原因是由于FIFO的调度模式和Spark Streaming的默认单线程的job执行机制

  • 2.因为有上面的问题 所以一般会选择FAIR调度模式 并且增加spark.streaming.concurrentJobs 个数

  • conf.set("spark.scheduler.mode", "FAIR")

  • 运行的job就会均分所有executor提供的资源

  • 3.因为Spark Streaming的任务存在Fair模式下并发的情况,所以需要在使用单例模式生成broadcast的时候要注意声明同步:

    • object WordBlacklist {

    @volatile private var instance: Broadcast[Seq[String]] = null

    def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
    synchronized {
    if (instance == null) {
    val wordBlacklist = Seq("a", "b", "c")
    instance = sc.broadcast(wordBlacklist)
    }
    }
    }
    instance
    }
    }

WEB UI 页面中 有些现象不一定是数据倾斜 要看是不是一个stage

  • 如果碰到是一个stage中 每个task数据量都很大 有可能是数据本地化的问题,因为task交给executor执行的时候 要看前面是不是有task执行 有的话默认等待3s 可以把3s 设置成0s 并且core数设置成task个数

  • 本地化调度

    • 进程间级别
    • 节点间级别
    • 机架级别
    • 没有好坏之分
  • 本地化等待时长0 task = core

  • 本地化等待时长6 task => core 2 3 倍数

spark sql 还可读取xml配置文件

双亲委派机制

  • 虚拟机内置的类加载器(Bootstrap ClassLoader)
  • Extension ClassLoader
  • App ClassLoader
  • 如果它也没有加载得到的话,则返回给委托的发起者,由它到指定的文件系统或网络等URL中加载该类。
  • 如果它们都没有加载到这个类时,则抛出ClassNotFoundException异常

堆内 堆外的内存如何使用??

  • 手动指定 动态回收

cache 指定副本数量的 2个?? 存储在哪?

  • 本地缓存 2份

distinct算子也是基于reducebykey实现的。

除了行动算子之外 还有什么算子能执行job?

  • sortByKey
  • 底层RangePartitioner 内部有collect方法调起runjob方法

算子类注意点

  • 宽窄依赖的算子

    • 窄依赖 ==> map filter union
    • 宽依赖 ==> repartition , join, cogroup, 和 *By 或者 *ByKey 类型的操作都会产生shuffle
  • aggregate

    • 作用域 分区内 分区间
    • 谁完成先执行谁
    • 一个结果给下一个结果
  • reduce 同上

  • 在使用完filter算子后,继续调用coalesce算子进行优化。

  • mapPartition 内部是迭代器 遍历转List

  • repartition shuffle

  • coaless no shuffle

  • reduceByKey(func)

  • foldByKey(zero)(func)

  • aggregateByKey(zero)(func1,func2)

  • combinerByKey(func1,func2,func3)

spark streaming+kafka不适合处理顺序性的消息?

  • 可能需要借助第三方存储来存储中间状态,spark streaming处理的时候 拿到全局唯一的id 再对其排序 方能实现有序性

Spark 失败重试与黑名单机制

  • 任务失败 => 重试机制
  • 失败节点的Executor Id和Host的被黑名单记录 并记录拉黑时间
  • 拉黑时间是指不能去上一个失败的节点调度这个task了

Spark速度比MapReduce快,不仅是内存计算??

  • 1.Spark内存计算 VS MapReduce读写磁盘
  • 2.MapReduce 进程级别的。 Spark 线程模型(减少了重新启动JVM进程带来的性能开销)
  • 3.MapReduce shuffle的过程需要经历过多的排序。Spark 分情况排序

Spark shuffle 与 MapReduce shuffle 区别?

  • 1.溢写不同机制的不同

    • mr最后会溢写再合并
    • spark shuffle 不会溢写 而是直接把内存中的数据跟文件进行合并
  • 2.MapReduce 进程级别的。 Spark 线程模型(减少了重新启动JVM进程带来的性能开销)

  • 3.MapReduce shuffle的过程需要经历过多的排序。Spark 分情况排序

SparkSQL的3种Join实现?

  • Broadcast Hash Join:

  • Hash join。 适合 很小的表 + 大表

  • 将小表 广播出去 加载到内存(事实表和维度表的join处理) ===> 小表的边界是10M

  • 再在每个executor上执行单机版hash join,小表映射,大表试探;

  • Shuffle Hash Join:

  • Hash join。 适合 小表 + 小表 或者 小表 + 大表

  • 如果表过大 再广播会对driver端和executor端造成较大的压力。 那么可以分而治之 两个表中,key相同的行都会被shuffle到同一个分区中 分别进行Hash Join。

  • Sort Merge Join:

  • 适合大表 + 大表

  • 如果两个表都很大 那么不适合加载到内存。

  • 那么可以先shuffle 让两张表数据分布到整个集群

  • 再对两个表分别进行排序

  • 分别遍历两个有序序列 碰到相同的join key 就合并 否则取更小的一边。

  • Spark SQL是如何选择join策略的?

    • 策略的选择会按照效率从高到低的优先级来排
    • Broadcast hash join
    • Shuffle hash join
    • Sort merge join

共享变量

  • 广播变量 共享读

    • 给同一个executor下面的多个task
    • 广播变量不要太大 200M 否则可以放到redis里面缓存
  • 累加器 共享写

    • 累加器原理

      • driver 端定义 序列化 => executor端 反序列化 => task执行结果 => driver端 汇总

      • 弊端

        • 不能写在转换算子里面 容易累加
        • map操作 如果不cache或者checkpoint 会重复累加

Spark中的OOM问题

  • map执行中内存溢出

  • shuffle后内存溢出

  • OOM的问题通常出现在execution这块内存中,因为storage这块内存在存放数据满了之后,会直接丢弃内存中旧的数据,对性能有影响但是不会有OOM的问题

  • 解决

    • 增大分区数 repartition
    • 增大内存

SparkStreaming如何解决小文件问题 ?

  • 1.增加batch大小
  • 2.Coalesce 减少分区
  • 3.定时合并HDFS端的小文件
  • 4.利用 foreach 自定义输出计算结果 到达一定阙值 再新开一个文件

谷歌论文 DataFlow

  • 大规模 无边界 乱序的数据集 ==> 解决这种问题的一种数据模型

    • 对无边界,无序的数据源,允许按数据本身的特征进行窗口计算,得到基于事件发生时间的有序结果,并能在准确性、延迟程度和处理成本之间调整
    • CLC 准确性 延迟 成本
    • CAP 一致性 可用性 分区容错性
  • 无论是流式计算/微批次或是批处理,它们要处理的问题都可以抽象为以下几个问题:

    • What: 需要计算的结果数据是什么

      • 在 The Dataflow Model 论文中,计算引擎提供两个原语,ParDo和GroupByKey
      • 通过核心原语,用户可以较为自由地描述计算过程,解答了What的问题
    • Where: 计算的上下文环境是什么

      • Windowing 策略描述了事件处理的上下文,即Where的问题
    • When: 什么时候计算输出结果

      • 窗口的完整性问题。最基础的方案是使用水位线(watermark)来解决,即根据一定算法根据最近处理的事件的事件时间估算出一个称为水位线的时间(比如最近 5 min 到达的事件的最小事件时间),早于该时间的事情被视为已经完全被处理。然而水位线是启发式的,并不能完全避免迟到的事件,另外水位线也可能引入额外的延迟。
      • 由此借鉴了Lambad架构的思想
      • Lambda 架构包含三个核心 view: batch view ,real time view 和 query view
      • 还需要触发器 解决什么时候一个窗口被计算和输出为窗格
      • 即When的问题
      • 其中最为常见的是基于时间,基于数据到达情况的触发器,基于窗口估计完成度(水位线)的触发器,除此以外也支持用户自定义触发器
    • How: 如何修正早期计算结果

      • 除了控制窗口结果计算何时触发,触发器还提供了三种重定义模式来控制同一窗口的不同窗格关联情况,以规范化How。
      • 抛弃: 窗口触发后,窗口内的数据被丢弃,不同窗格之间没有联系。
      • 累积: 窗口触发后,窗口内的数据仍被保留在持久化的状态中,而后期的计算结果是对上一次结果的一个修正的版本。
      • 累计和撤回: 窗口触发后,在进行累积语义的基础上,计算结果的一份复制也被保留到持久化状态中。当该窗口将来再次触发时(迟到的数据抵达),上一次的结果值先下发做撤回处理,然后将新的结果作为正常数据下发。
      • 其中抛弃模式效率最高,因为不需要维持状态或者过去窗口的数据,但应用场景有限,适合下游实现了累积统计的情况;累积模式与 Lambda 架构十分相似,适用场景广泛,是最常见的模式;累积和撤回模式是最复杂成本最高的模式,需要下游也支持可撤回操作,适合上游操作会导致下游不同的key分布的情况。
  • dataflow 的思想 => spark streaming 微批次处理 (这段时间给flink 发家的机会) => spark 2.0 版本 结构化流(统一批流 api

  • 但是实质上 还是周期性 + 微批次处理 100hm级别 checkpoint + 预写日志 --> 精准一次性) => spark 2.3 版本 结构化流 连续模式 实验性的发布 真正利用 dataflow思想 lamdba batch view ,real time view 和 query view 的思想 利用触发器 + checkpoint 间隔时间

  • 连续处理 **秒 记录查询的进度 触发器再次启动查询(相当于一边实时记录查询的进度 一边触发启动任务的查询)

  • Structured Streaming

    • 是一个基于Spark SQL引擎的可扩展、容错的流处理引擎。统一了流、批的编程模型,你可以使用静态数据批处理一样的方式来编写流式计算操作。并且支持基于event_time的时间窗口的处理逻辑

    • (Spark Streaming) VS (Structured Streaming)

      • Spark Streaming 不足

        • Processing Time 而不是 Event Time
        • Complex, low-level api
        • exactly-once
        • 批流代码不统一
      • Structured Streaming 优势

        • 简洁的模型,用户可以直接把一个流想象成是无限增长的表格
        • 一致的 API
        • 多语言支持
        • Structured Streaming会通过checkpoint和预写日志等机制来实现Exactly-Once语义
        • 复用 Spark SQL 的执行引擎
        • 通过统一api使得使用更简单
        • 通过端到端的 exactly-once 语义 使得跟其他系统更好的集成
        • 通过复用spark sql 引擎保证高性能
    • 版本特性

      • spark 2.0 版本

        • 合并dataframe和datasets

        • 新增结构化流

          • (提供了Event time, windowing, sessions, sources & sink等API 连接流式数据与静态数据集
          • 交互式查询结果:通过JDBC server将RDD结果暴露出去,以便于交互式查询)
        • 干掉了 HashShuffle 加入了BypassShuffle

        • 新增高级函数

          • A),get_json_object()
          • B),from_json()
          • C),to_json()
          • D),explode()
          • E),selectExpr()
      • spark 2.2 版本

        • Structured Streaming的生产环境支持已经就绪
      • spark 2.3 版本

        • 双流 join
        • 为结果化流 提供了一个更高级的api 连续模式 支持低延迟(~1 ms)端到端,并保证at-least-once
        • 与默认的微批处理引擎相比,默认的micro-batch processing可以保证exactly-once语义,但最多只能实现约100ms的延迟(检查点 和 预习日志)
      • spark 2.4 版本

        • 添加了35个高阶函数,用于在 Spark SQL 中操作数组/map
        • Structured Streaming 的各种增强功能。 例如,连续处理(continuous processing)中的有状态操作符
      • spark 3.0 版本

        • 自适应查询优化

          • 动态减少 Reducer 的数量
          • 将 Sort Merge Join 转换为 Broadcast Hash Join
          • 处理数据倾斜:
          • SparkSQL自适应框架可以根据预先的配置在作业运行过程中自动检测是否出现倾斜,并对检测到的倾斜进行优化处理。
          • 优化的主要逻辑是对倾斜的partition进行拆分由多个task来进行处理,最后通过union进行结果合并。
        • 动态分区修剪

          • 动态分区裁剪就是基于运行时(run time)推断出来的信息来进一步进行分区裁剪

          • 要求都是分区表 并且字段在on条件内 还需要spark.sql.optimizer.dynamicPartitionPruning.enabled 设置为true

          • 还需要另外两个参数达到一定的条件 才支持分区修剪

          • 静态分区裁剪

            • Spark 在编译 SQL 的时候自动将 Filter 算子下推到数据源,也就是在 Scan 前进行了 Filter 操作,将 day_of_week = 'Mon' 的数据全部拿出来,其他数据不需要的拿出,这样 Spark SQL 中处理的数据就变少了,整个 SQL 的查询数据就会变快,这一切都是编译的时候(compile time)进行的,所以这个叫做静态分区裁剪
        • 支持 Hadoop 3.x

        • 结构化流式用户界面

ShuffleManager发展概述

  • 未经优化的HashShuffleManager

  • 优化后的HashShuffleManager

  • SortShuffleManager(普通运行机制)

    • shuffle 溢写条件

      • elementsRead % 32 == 0 && currentMemory >= 5M

        • 申请不到内存 就会溢写
      • reduceByKey这种预聚合的算子 ==> 选用Map进行聚合 ==> 写入内存

      • join 这种普通的shuffle算子 ==> 选用Array ==> 写入内存

      • 溢写前 ==> 排序 ==> 分批次(10000条) 写入磁盘

      • 写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

    • task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

  • bypass运行机制

一条 SQL 在 Apache Spark 的执行流程?

  • SQL 解析阶段 - SparkSqlParser

    • ANTLR是一款强大的语法生成器工具: 定义一个语法文件 然后ANTLR会根据语法文件生成几个java类 一个是语法解析器 一个是词法解析器
    • 然后两个解析器会对sql 构建对应的语法树
  • 绑定逻辑计划阶段 - Analyzer

  • 优化逻辑计划阶段 - Optimizer

    • 列裁剪
    • 谓词下推
    • 常量累加
    • 常量替换
  • 生成可执行的物理计划阶段 - SparkPlanner

  • 全阶段代码生成阶段 - WholeStageCodegen

  • 代码编译

  • SQL 执行

  • catalyst是sql的一个解析器引擎

  • Catalyst工作流程

    • Parser -- ANTLR ==> 语法树
    • Analyzer -- 遍历语法树上的每个节点 绑定数据类型和函数
    • Optimizer(逻辑计划) -- SQL优化器核心执行策略主要分为两个大的方向:基于规则优化(RBO)以及基于代价优化(CBO)
    • 基于规则的优化策略实际上就是对语法树进行一次遍历,模式匹配能够满足特定规则的节点,再进行相应的等价转换。
    • 比较常见的规则:谓词下推(Predicate Pushdown)、常量累加(Constant Folding)和列值裁剪(Column Pruning)。
    • Physical Planning(物理计划) -- 逻辑计划变为Spark可以真正执行的物理计划
    • 使用explain方法查看物理执行计划

Apache Spark 内存管理详解

  • Spark内存均特指Executor的内存

  • Spark内存“规划式”管理:对象实例占用内存的申请和释放都由JVM完成,Spark只能在申请后和释放前记录这些内存

    • 申请内存的过程

      • Spark在代码中new一个对象实例
      • JVM从堆内内存分配空间,创建对象并返回对象引用
      • Spark保存该对象的引用,记录该对象占用的内存
    • 释放内存的过程

      • Spark记录该对象释放的内存,删除该对象的引用
      • 等待JVM的垃圾回收机制释放该对象占用的堆内内存
  • 堆内内存 分为几个部分组成

    • 存储内存: 存储RDD缓存 广播变量的
    • 计算内存: 存储算子计算的中间结果的
    • 用户内存: 存储用户元数据的
    • 预留内存: 预留一部分内存 存储Spark 对象的(OOM)
  • 为什么设置300M预留内存?

    • Spark内存中有可能存在非序列化对象 这部分对象的内存申请大小是根据抽样估算而来,并且有一些被标记释放的对象没有被JVM回收 那么这种情况 就会造成Spark内存的误差 所以Spark并不能准确记录实际可用的堆内内存,从而也就无法完全避免内存溢出(OOM, Out of Memory)的异常
    • 因为之前是按照比例来算的 那么内存比较小的化 预留内存也会很小 造成OOM
    • 所以堆内内存 预留了300M的内存
  • 堆外内存

    • Spark可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的GC扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差
  • 内存管理模式:静态 统一

    • 静态内存管理模型 老版本的 比例都是固定的 新手容易造成存储和执行内存 一半有大量空闲内存 一半内存打满的情况
    • 统一内存模型 新版本的 内存可动态调整(动态占用机制) 如果是执行内存被占用 可以把存储内存溢写到磁盘 归还内存 反之就不能归还 因为需要保证shuffle计算的内存。
    • 虽然可以动态调整 看似高枕无忧 但是如果存储空间太大 或者缓存的数据过多 反而会导致频繁的全量垃圾回收 降低执行任务的性能。实际生产中 还是需要多方考虑的
  • 内存管理

    • 存储内存

      • RDD持久化机制

        • 依赖于块管理器:BlockManager

          • BlockManagerMaster 负责存储元数据信息 协调各个BlockManager的工作

          • BlockManager 负责数据存储和传递

            • DiskStore:负责磁盘存储
            • MemoryStore:负责内存存储
            • BlockManagerWorker:负责接收相关请求并执行任务
            • ConnectionManager:负责与其他BlockManager建立连接
          • shuffle write的时候 触发BlockManager写操作 先写内存 再写磁盘() 如果有副本 则进行同步

          • shuffle read的时候 先读本地 如果没有再从别的BlockManager 读取

      • RDD缓存

        • (unroll 展开) 底层linkedHashMap Iterator(Block) LRU算法 淘汰 + 落盘 机制
        • 存储内存中都是一些无序的块 把乱序的块放入到一个linkHashMap 结构里面保证有序的过程就是展开操作
    • 执行内存

      • 申请内存 内存不足 阻塞 等待其他任务释放内存
      • Spark用AppendOnlyMap来存储Shuffle过程中的数据,当其大到一定程度 会溢写磁盘并被合并

背压机制

  • 令牌桶原理。动态调节数据接收速率的功能

    • spark.streaming.backpressure.enabled为true即可开启反压
  • 有Receiver时反压原理

    • Receiver控制速率
    • Receiver负责接收数据
    • 如果接受的数据是一条条上报 使用BlockGenerator
    • 控制BlockGenerator的处理速度,BlockGenerator阻塞了也就相当于间接阻塞了Receiver接受速率。
    • 如果Receiver一次接收并上报一批数据就不会使用BlockGenerator 背压不起作用
    • BlockGenerator采用令牌桶算法实现速率控制:
    • BlockGenerator在接收Receiver传递过来的数据时调用waitToPush去获取令牌了,
    • 没有令牌时,BlockGenerator阻塞,那么Receiver也会阻塞下去
  • 没有Receiver时反压原理

    • 对接Kafka =>DirectKafkaInputDStream时的反压实现
    • 通过评估每秒接收的数据量R 计算每个分区的最大的消费记录数m,
    • 然后提交job运行后KafkaRDD的compute方法开始从kafka消费m条记录

spark 消费Kafka

  • 默认5s 自动提交(5s能做完的任务) + HBase / ES ==> 精准一次性消费
  • 手动提交 + HBase / ES ==> 精准一次性消费
  • 比MySQL加事务的方式 效率高
  • Receiver
    0-8
    offset存储:ZK
    Direct
    0-8
    offset存储:CK, val ssc = StreamingContext.getActiveOrCreate("./ck",()=>SSC)
    手动维护(Redis,Mysql,ES,HBase)
    0-10
    offset存储:__consumer_offsets
    手动维护(Redis,Mysql,ES,HBase)

优化

常规性能调优

  • 最优资源配置

    • bin/spark-submit
      --class com.atguigu.spark.WordCount
      --master yarn
      --deploy-mode cluster
      --num-executors 80
      --driver-memory 6g
      --executor-memory 6g
      --executor-cores 3
      --queue root.default
      --conf spark.yarn.executor.memoryOverhead=2048
      --conf spark.core.connection.ack.wait.timeout=300
      /usr/local/spark/spark.jar
  • RDD优化

    • RDD复用
    • RDD持久化
    • RDD尽可能早的filter操作
  • 并行度调节

  • 广播大变量

  • Kryo序列化

  • 调节本地化等待时长

算子调优

  • mapPartitions 代替普通的map
  • foreachPartition 减少数据库连接
  • filter与coalesce的配合使用
  • repartition解决SparkSQL低并行度问题
  • reduceByKey预聚合 代替groupByKey

数据倾斜

  • 数据倾斜发生时的现象?

    • 1.绝大多数task执行得都非常快,但个别task执行极慢
    • 2.原本能够正常执行的Spark作业,某天突然报出OOM(内存溢出)异常
  • 如何定位导致数据倾斜的代码?

    • 数据倾斜只会发生在shuffle过程中 这里给大家罗列一些常用的并且可能会触发shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的
    • 1.某个task执行特别慢的情况。先看Web UI页面 时间长的task 在哪个stage内 从而推断是哪个算子造成的数据倾斜。
    • 2.某个task莫名其妙内存溢出的情况。通过堆栈信息 直接定位某行代码造成的OOM
  • 如何查看导致数据倾斜的key的数据分布情况?

    • 如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下SQL中使用的表的key分布情况。

    • 如果是对Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码

      • val sampledPairs = pairs.sample(false, 0.1)
      • val sampledWordCounts = sampledPairs.countByKey()
      • sampledWordCounts.foreach(println(_))
  • 数据倾斜的解决方案

    • 一:上游是hive的情况

      • 使用Hive ETL预处理数据

        • 把数据倾斜的问题交给上游处理

        • 优点

          • 简单
        • 缺点

          • 指标不治本。上游还会有数据倾斜的问题
        • 实践经验

          • 在美团·点评的交互式用户行为分析系统中使用了这种方案,该系统主要是允许用户通过Java Web系统提交数据分析统计任务,后端通过Java提交Spark作业进行数据分析统计。要求Spark作业速度必须要快,尽量在10分钟以内,否则速度太慢,用户体验会很差。所以我们将有些Spark作业的shuffle操作提前到了Hive ETL中,从而让Spark直接使用预处理的Hive中间表,尽可能地减少Spark的shuffle操作,大幅度提升了性能,将部分作业的性能提升了6倍以上
    • 二:极少的key造成的数据倾斜 并且这些key无关紧要的情况

      • 过滤少数导致倾斜的key

        • 优点

          • 简单
        • 缺点

          • 适用场景不多
        • 实践经验

          • 在项目中我们也采用过这种方案解决数据倾斜。有一次发现某一天Spark作业在运行的时候突然OOM了,追查之后发现,是Hive表中的某一个key在那天数据异常,导致数据量暴增。因此就采取每次执行前先进行采样,计算出样本中数据量最大的几个key之后,直接在程序中将那些key给过滤掉
    • 三:处理数据倾斜最简单的一种方案

      • 提高shuffle操作的并行度

        • 优点

          • 实现起来比较简单,可以有效缓解和减轻数据倾斜的影响
        • 缺点

          • 只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限
        • 实践经验

          • 该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定还是会分配到一个task中去处理,因此注定还是会发生数据倾斜的。所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用嘴简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用
    • 四:聚合类的shuffle操作引发的数据倾斜

      • 加盐+两阶段聚合(局部聚合+全局聚合)

        // 第一步,给RDD中的每个key都打上一个随机前缀。

        JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(

            new PairFunction<Tuple2<Long,Long>, String, Long>() {
        
                private static final long serialVersionUID = 1L;
        
                @Override
        
                public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
        
                        throws Exception {
        
                    Random random = new Random();
        
                    int prefix = random.nextInt(10);
        
                    return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
        
                }
        
            });
        

        // 第二步,对打上随机前缀的key进行局部聚合。

        JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(

            new Function2<Long, Long, Long>() {
        
                private static final long serialVersionUID = 1L;
        
                @Override
        
                public Long call(Long v1, Long v2) throws Exception {
        
                    return v1 + v2;
        
                }
        
            });
        

        // 第三步,去除RDD中每个key的随机前缀。

        JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(

            new PairFunction<Tuple2<String,Long>, Long, Long>() {
        
                private static final long serialVersionUID = 1L;
        
                @Override
        
                public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
        
                        throws Exception {
        
                    long originalKey = Long.valueOf(tuple._1.split("_")[1]);
        
                    return new Tuple2<Long, Long>(originalKey, tuple._2);
        
                }
        
            });
        

        // 第四步,对去除了随机前缀的RDD进行全局聚合。

        JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(

            new Function2<Long, Long, Long>() {
        
                private static final long serialVersionUID = 1L;
        
                @Override
        
                public Long call(Long v1, Long v2) throws Exception {
        
                    return v1 + v2;
        
                }
        
            });
        
        • 将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图
    • 五:join操作 小表和大表

      • 将reduce join转为map join

        // 首先将数据量比较小的RDD的数据,collect到Driver中来。

        List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()

        // 然后使用Spark的广播功能,将小RDD的数据转换成广播变量,这样每个Executor就只有一份RDD的数据。

        // 可以尽可能节省内存空间,并且减少网络传输性能开销。

        final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);

        // 对另外一个RDD执行map类操作,而不再是join类操作。

        JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(

            new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
        
                private static final long serialVersionUID = 1L;
        
                @Override
        
                public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple)
        
                        throws Exception {
        
                    // 在算子函数中,通过广播变量,获取到本地Executor中的rdd1数据。
        
                    List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
        
                    // 可以将rdd1的数据转换为一个Map,便于后面进行join操作。
        
                    Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
        
                    for(Tuple2<Long, Row> data : rdd1Data) {
        
                        rdd1DataMap.put(data._1, data._2);
        
                    }
        
                    // 获取当前RDD数据的key以及value。
        
                    String key = tuple._1;
        
                    String value = tuple._2;
        
                    // 从rdd1数据Map中,根据key获取到可以join到的数据。
        
                    Row rdd1Value = rdd1DataMap.get(key);
        
                    return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
        
                }
        
            });
        

        // 这里得提示一下。

        // 上面的做法,仅仅适用于rdd1中的key没有重复,全部是唯一的场景。

        // 如果rdd1中有多个相同的key,那么就得用flatMap类的操作,

        // 在进行join的时候不能用map,而是得遍历rdd1所有数据进行join。

        // rdd2中每条数据都可能会返回多条join后的数据。

        • 广播小表
    • 六:join操作 大表和大表(数据倾斜key不多的情况)

      • 采样倾斜key并拆分join操作

        // 首先从包含了少数几个导致数据倾斜key的rdd1中,采样10%的样本数据。

        JavaPairRDD<Long, String> sampledRDD = rdd1.sample(false, 0.1);

        // 对样本数据RDD统计出每个key的出现次数,并按出现次数降序排序。

        // 对降序排序后的数据,取出top 1或者top 100的数据,也就是key最多的前n个数据。

        // 具体取出多少个数据量最多的key,由大家自己决定,我们这里就取1个作为示范。

        JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(

            new PairFunction<Tuple2<Long,String>, Long, Long>() {
        
                private static final long serialVersionUID = 1L;
        
                @Override
        
                public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
        
                        throws Exception {
        
                    return new Tuple2<Long, Long>(tuple._1, 1L);
        
                }
        
            });
        

        JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(

            new Function2<Long, Long, Long>() {
        
                private static final long serialVersionUID = 1L;
        
                @Override
        
                public Long call(Long v1, Long v2) throws Exception {
        
                    return v1 + v2;
        
                }
        
            });
        

        JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair(

            new PairFunction<Tuple2<Long,Long>, Long, Long>() {
        
                private static final long serialVersionUID = 1L;
        
                @Override
        
                public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
        
                        throws Exception {
        
                    return new Tuple2<Long, Long>(tuple._2, tuple._1);
        
                }
        
            });
        

        final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;

        // 从rdd1中分拆出导致数据倾斜的key,形成独立的RDD。

        JavaPairRDD<Long, String> skewedRDD = rdd1.filter(

            new Function<Tuple2<Long,String>, Boolean>() {
        
                private static final long serialVersionUID = 1L;
        
                @Override
        
                public Boolean call(Tuple2<Long, String> tuple) throws Exception {
        
                    return tuple._1.equals(skewedUserid);
        
                }
        
            });
        

        // 从rdd1中分拆出不导致数据倾斜的普通key,形成独立的RDD。

        JavaPairRDD<Long, String> commonRDD = rdd1.filter(

            new Function<Tuple2<Long,String>, Boolean>() {
        
                private static final long serialVersionUID = 1L;
        
                @Override
        
                public Boolean call(Tuple2<Long, String> tuple) throws Exception {
        
                    return !tuple._1.equals(skewedUserid);
        
                }
        
            });
        

        // rdd2,就是那个所有key的分布相对较为均匀的rdd。

        // 这里将rdd2中,前面获取到的key对应的数据,过滤出来,分拆成单独的rdd,

        // 并对rdd中的数据使用flatMap算子都扩容100倍。

        // 对扩容的每条数据,都打上0~100的前缀。

        JavaPairRDD<String, Row> skewedRdd2 = rdd2.filter(

             new Function<Tuple2<Long,Row>, Boolean>() {
        
                private static final long serialVersionUID = 1L;
        
                @Override
        
                public Boolean call(Tuple2<Long, Row> tuple) throws Exception {
        
                    return tuple._1.equals(skewedUserid);
        
                }
        
            }).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
        
                private static final long serialVersionUID = 1L;
        
                @Override
        
                public Iterable<Tuple2<String, Row>> call(
        
                        Tuple2<Long, Row> tuple) throws Exception {
        
                    Random random = new Random();
        
                    List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
        
                    for(int i = 0; i < 100; i++) {
        
                        list.add(new Tuple2<String, Row>(i + "_" + tuple._1, tuple._2));
        
                    }
        
                    return list;
        
                }
        
        
        
            });
        

        // 将rdd1中分拆出来的导致倾斜的key的独立rdd,每条数据都打上100以内的随机前缀。

        // 然后将这个rdd1中分拆出来的独立rdd,与上面rdd2中分拆出来的独立rdd,进行join。

        JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(

            new PairFunction<Tuple2<Long,String>, String, String>() {
        
                private static final long serialVersionUID = 1L;
        
                @Override
        
                public Tuple2<String, String> call(Tuple2<Long, String> tuple)
        
                        throws Exception {
        
                    Random random = new Random();
        
                    int prefix = random.nextInt(100);
        
                    return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
        
                }
        
            })
        
            .join(skewedUserid2infoRDD)
        
            .mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>,
        
                       Long, Tuple2<String, Row>>() {
        
                            private static final long serialVersionUID = 1L;
        
                            @Override
        
                            public Tuple2<Long, Tuple2<String, Row>> call(
        
                                Tuple2<String, Tuple2<String, Row>> tuple)
        
                                throws Exception {
        
                                long key = Long.valueOf(tuple._1.split("_")[1]);
        
                                return new Tuple2<Long, Tuple2<String, Row>>(key, tuple._2);
        
                            }
        
                        });
        

        // 将rdd1中分拆出来的包含普通key的独立rdd,直接与rdd2进行join。

        JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(rdd2);

        // 将倾斜key join后的结果与普通key join后的结果,uinon起来。

        // 就是最终的join结果。

        JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);

        • 对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key。
        • 然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD
        • 接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个RDD
        • 再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了
        • 而另外两个普通的RDD就照常join即可
        • 最后将两次join的结果使用union算子合并起来即可,就是最终的join结果
    • 七:RDD中有大量的key导致数据倾斜

      • 使用随机前缀和扩容RDD进行join

        // 首先将其中一个key分布相对较为均匀的RDD膨胀100倍。

        JavaPairRDD<String, Row> expandedRDD = rdd1.flatMapToPair(

            new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
        
                private static final long serialVersionUID = 1L;
        
                @Override
        
                public Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple)
        
                        throws Exception {
        
                    List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
        
                    for(int i = 0; i < 100; i++) {
        
                        list.add(new Tuple2<String, Row>(0 + "_" + tuple._1, tuple._2));
        
                    }
        
                    return list;
        
                }
        
            });
        

        // 其次,将另一个有数据倾斜key的RDD,每条数据都打上100以内的随机前缀。

        JavaPairRDD<String, String> mappedRDD = rdd2.mapToPair(

            new PairFunction<Tuple2<Long,String>, String, String>() {
        
                private static final long serialVersionUID = 1L;
        
                @Override
        
                public Tuple2<String, String> call(Tuple2<Long, String> tuple)
        
                        throws Exception {
        
                    Random random = new Random();
        
                    int prefix = random.nextInt(100);
        
                    return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
        
                }
        
            });
        

        // 将两个处理后的RDD进行join即可。

        JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);

        • 类似于方案六 只是不拆分

        • 缺点

          • 对内存资源要求很高

shuffle相关参数调优

  • spark.shuffle.file.buffer
  • 默认值:32k
  • 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
  • spark.reducer.maxSizeInFlight
  • 默认值:48m
  • 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
  • spark.shuffle.io.maxRetries
  • 默认值:3
  • 参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
  • 调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。
  • spark.shuffle.io.retryWait
  • 默认值:5s
  • 参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
  • 调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。
  • spark.shuffle.memoryFraction
  • 默认值:0.2
  • 参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
  • 调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。
  • spark.shuffle.manager
  • 默认值:sort
  • 参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
  • 调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。
  • spark.shuffle.sort.bypassMergeThreshold
  • 默认值:200
  • 参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
  • 调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。
  • spark.shuffle.consolidateFiles
  • 默认值:false
  • 参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
  • 调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

面试

reducebykey,groupbykey的区别等等类似算子对比,如何高效使用mappartition,然后foreachPartition与foreach之间的区别及底层实现原理

来点猛料,广播变量的原理及演变过程,使用场景,使用广播变量一定划算吗?大变量咋办呢?Spark sreaming定期更新广播变量的实现

累加器的原理及应用场景,累加器使用有陷阱么

  • 累加器 不能写在转换算子里面 容易累加

序列化,反序列化,闭包,垃圾回收机制(过期rdd的回收,cache的回收,shuffle数据回收等)

内存申请,kafka分区设置的依据是啥?

blockrdd和kafkardd的底层区别

广播变量的使用及释放机制等

动态分区发现和topic发现机制

XMind: ZEN - Trial Version

这篇关于spark大佬总结的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!