Hadoop1.x版本的问题
Hadoop2.x版本
模块分区
模块解释
Spark Core
Spark SQL
Spark Streaming
Spark MLlib
Spark GraphX
集群管理器
Local模式
模式
local
local[K]
local[*]
基本语法
参数说明
Standalone模式
Yarn模式
Windows 模式
k8s模式
Mesos模式
Driver(驱动器)
概述
功用
Executor(执行器)
概述
功用
图示
并行度
有向无环图(DAG)
提交流程
Spark应用程序提交到Yarn环境中执行的时候,一般会有两种部署执行的方式:Client和Cluster。两种模式,主要区别在于:Driver程序的运行节点
Yarn Client模式
Yarn Cluster模式
RDD
累加器
广播变量
累加器和广播变量都不需要进行shuffle
RDD概述
什么是RDD
Spark是一个分布式数据集的分析框架,将计算单元缩小为更适合分布式计算和并行计算的模型,称之为RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据(计算)抽象。
弹性
分布式
数据集
代码中是一个抽象类,它代表一个不可变、可分区、里面的元素可并行计算的集合。
不可变
可分区
并行计算
图示
RDD核心属性
分区列表
分区计算函数
RDD之间的依赖关系
分区器
首选位置(可选)
一个存储存取每个Partition的优先位置(preferred location)的列表
RDD的特点
分区
RDD逻辑上是分区的,每个分区的数据是抽象存在的
计算的时候会通过一个compute函数得到每个分区的数据
只读
要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD
依赖
RDDs维护着操作算子转换的血缘关系,即依赖
依赖的分类
窄依赖
宽依赖
图示
缓存
Checkpoint
RDD的创建
从集合中创建RDD(内存)
从外部的存储创建RDD(硬盘)
从其他RDD创建(转换)
RDD并行度与分区
val mkRDD: RDD[Int] = sc.makeRDD(List(5,6,7,8),2)
makeRDD的第一个参数:数据源
makeRDD的第二个参数:默认并行度(分区的数量)
首先根据指定的并行度计算
如果没有指定 并行度默认会从spark配置信息中获取spark.default.parallelism值
如果获取不到指定参数,会采用默认值totalCores(机器的总核数)
机器总核数 = 当前环境中可用核数
RDD的转换算子
Value类型
map(fuc)
mapPartitions(fuc)
map()和mapPartitions()的区别
mapPartitionsWithIndex(fuc)
flatMap(fuc)
glom(配合flatmap可以合并分区)(fuc)
groupBy(fuc)
dataRDD.groupBy(_ % 2,2).collect().foreach(println)
filter(fuc)
1,2,3,4
),1)
dataRDD.filter(_ > 2).collect().foreach(println)
sample(withReplacement, fraction, seed)
// 抽取数据不放回(伯努利算法)
// 伯努利算法:又叫0、1分布。例如扔硬币,要么正面,要么反面。
// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
// 第一个参数:抽取的数据是否放回,false:不放回
// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
// 第三个参数:随机数种子 确定结果一致性
val dataRDD1 = dataRDD.sample(false, 0.5)
// 抽取数据放回(泊松算法)
// 第一个参数:抽取的数据是否放回,true:放回;false:不放回
// 第二个参数:重复数据的几率,范围大于等于0.表示每一个元素被期望抽取到的次数
// 第三个参数:随机数种子 确定结果一致性
val dataRDD2 = dataRDD.sample(true, 2)
用法
在实际开发中,往往会出现数据倾斜的情况,那么可以从数据倾斜的分区中抽取数据,查看数据的规则,分析后,可以进行改善处理,让数据更加均匀
伯努利算法中第二个参数 抽取概率的算法
distinct([numTasks])
coalesce(numPartitions)
repartition(numPartitions)
coalesce和repartition的区别
sortBy(func,[ascending], [numTasks])
双Value类型交互
union(otherDataset)
并集
subtract (otherDataset)
差集
intersection(otherDataset)
交集
zip(otherDataset)
拉链
Key-Value类型
partitionBy
将数据按照指定Partitioner重新进行分区。Spark默认的分区器是HashPartitioner
如果重分区的分区器和当前RDD的分区器一样怎么办?
Spark还有其他分区器吗?
如果想按照自己的方法进行数据分区怎么办?
groupByKey
reduceByKey(func, [numTasks])
reduceByKey和groupByKey的区别
aggregateByKey
foldByKey
combineByKey[C]
sortByKey([ascending], [numTasks])
mapValues
join(otherDataset, [numTasks])
leftOuterJoin
cogroup(otherDataset, [numTasks])
RDD行动算子
reduce(func)
collect()
count()
first()
take(n)
takeOrdered(n)
aggregate
fold(num)(func)
saveAsTextFile(path)
saveAsSequenceFile(path)
saveAsObjectFile(path)
countByKey()
countByValue()
foreach(func)
算子的逻辑代码在 executor 执行
算子之外的代码在 driver 执行
RDD序列化
在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要主要的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的
函数的序列化 extends Serializable
kryo框架序列化
val conf: SparkConf = new SparkConf()
.setAppName("SerDemo")
.setMaster("local[*]")
// 替换默认的序列化机制
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册需要使用 kryo 序列化的自定义类
.registerKryoClasses(Array(classOf[Searcher]))
case class Searcher(val query: String) {
def isMatch(s: String) = {
s.contains(query)
}
def getMatchedRDD1(rdd: RDD[String]) = {
rdd.filter(isMatch)
}
def getMatchedRDD2(rdd: RDD[String]) = {
val q = query
rdd.filter(_.contains(q))
}
}
- kryo 将不支持节点发送的情况 traint 不生效 java早期没有考虑到分布式传输的情况 还就得用kryo框架 - 比java 序列化快10倍 显示的声明
闭包检测
RDD依赖关系
血缘关系
在大量记录上执行的单个操作,将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区
RDD的Lineage会记录RDD的元数据信息和转换行为
如何查看Lineage
如何查看依赖类型
窄依赖
宽依赖
DAG(Directed Acyclic Graph)
任务划分
名词解释
Application
Job
Stage
Task
Application->Job->Stage-> Task每一层都是1对n的关系
RDD持久化
RDD缓存
RDD CheckPoint
缓存和检查点区别
RDD分区器
概述
Hash
Range
Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数
只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD分区器的值是None
获取RDD分区
Hash分区
Ranger分区
将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,但是分区内的元素是不能保证顺序的。
实现步骤
自定义分区
继承Partitioner
实现方法
numPartitions: Int:返回创建出来的分区数。
getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。
equals():Java
RDD文件读取与保存
文件类数据读取与保存
Text文件
Json文件
Csv文件
Sequence文件
Object文件
文件系统类数据读取与保存
HDFS
最抽象的两个函数接口
hadoopRDD
newHadoopRDD
备注
在Hadoop中以压缩形式存储的数据,不需要指定解压方式就能够进行读取
map-reduce如何读取某一类型数据,将该对应读取方式改写为上述的两种接口
MySQL数据库
jdbcRDD
添加依赖
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
代码详解
MySQL读取
Class.forName(driver)
DriverManager.getConnection(url, userName, passWd)
"select * from `rddtable` where `id`>=?;",
1,
10,
1,
r => (r.getInt(1), r.getString(2))
MySQL写入
val ps = conn.prepareStatement("insert into rddtable(name) values (?)")
ps.setString(1, data)
ps.executeUpdate()
HBase数据库
概述
添加依赖
代码详解
HBase读取
val conf: Configuration = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104")
conf.set(TableInputFormat.INPUT_TABLE, "rddtable")
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
HBase写入
admin.disableTable(fruitTable)
admin.deleteTable(fruitTable)
val put = new Put(Bytes.toBytes(triple._1))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))
(new ImmutableBytesWritable, put)
累加器就是在Driver程序中定义的变量 并且能让这个变量在Executor端的每个task都有一个变量副本,每个task更新这些副本的值后,传回Driver端进行merge
系统累加器
自定义累加器
var map : mutable.Map[String,Int] = mutable.Map()
override def isZero: Boolean = map.isEmpty
override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = new MyAccount
override def reset(): Unit = map.clear()
override def add(v: String): Unit = {
map.update(v , map.getOrElseUpdate(v,0) + 1)
}
override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
var map2 = other.value
map = map.foldLeft(map2)((map,kv) => {
map.update(kv._1, map.getOrElseUpdate(kv._1,0) + kv._2)
map
})
}
override def value: mutable.Map[String, Int] = map
}
声明广播变量 会将数据广播到 每个Executor端的缓存中
用法
定义
Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame和DataSet,将Spark SQL转换成RDD,然后提交到集群执行
什么是结构化数据
什么是非结构化数据
特点
DataFrame
定义
DataFrame是一个分布式数据容器,除了数据以外,还记录数据的结构信息,即schema,也支持嵌套数据类型(struct、array和map)
DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待
DataFrame也是懒执行的。性能上比RDD要高,主要原因:
优化的执行计划:查询计划通过Spark catalyst optimiser进行优化
与RDD的关系图
DataSet
SparkSession
val spark : SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
Session范围
DSL语法
注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名
df.select($"username",$"age" + 1).show
df.select('username, 'age + 1).show()
df.filter($"age">30).show
df.groupBy("age").count.show
RDD、DataFrame、DataSet
RDD - DataFrame
RDD -> DataFrame
import spark.implicits._
val df = rdd.toDF("id","name","age")
DataFrame -> RDD
RDD DataSet
RDD -> DataSet
case (id, name, age) => {
User(id, name, age)
}
}
val userDS = userRDD.toDS()
DataSet -> RDD
DataFrame - DataSet
DataFrame -> DataSet
DataSet -> DataFrame
图示
共性与差异
共性
差异
用户自定义函数
弱类型
//定义用户自定义聚合函数
val udaf = new MyavgUDAF
spark.udf.register("avg" , udaf)
spark.sql("select id,avg(age) from test group by id").show()
- class MyavgUDAF extends UserDefinedAggregateFunction{
//TODO 输入类型
override def inputSchema: StructType = {
StructType(Array(StructField("age", LongType)))
}
//TODO 缓冲区操作类型
override def bufferSchema: StructType = {
StructType(Array(StructField("totalAges", LongType),
StructField("count", LongType)
))
}
//TODO 返回值 类型
override def dataType: DataType = LongType
//TODO 函数的稳定性
override def deterministic: Boolean = true
//TODO 函数缓冲区初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
//TODO 更新缓冲区
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
//TODO 合并缓冲区
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
//TODO 最终返回值
override def evaluate(buffer: Row): Any = {
buffer.getLong(0) / buffer.getLong(1)
}
}
强类型
val udaf = new MyavgUDAF
val ds: Dataset[User] = frame.as[User]
ds.select(udaf.toColumn).show()
- case class User(
age:Long,
name:String,
id:Int
)
case class avgBuffle (
var totalage : Long,
var totalCount : Long
)
class MyavgUDAF extends Aggregator[User,avgBuffle,Long] {
//初始化
override def zero: avgBuffle = {
avgBuffle(0L,0L)
}
//更新
override def reduce(b: avgBuffle, a: User): avgBuffle = {
b.totalage = b.totalage + a.age
b.totalCount = b.totalCount + 1
b
}
//合并
override def merge(b1: avgBuffle, b2: avgBuffle): avgBuffle = {
b1.totalage += b2.totalage
b1.totalCount += b2.totalCount
b1
}
//返回值
override def finish(reduction: avgBuffle): Long = {
reduction.totalage / reduction.totalCount
}
//DataSet默认额编解码器,用于序列化,固定写法
//自定义类型就是produce 自带类型根据类型选择
override def bufferEncoder: Encoder[avgBuffle] = Encoders.product
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}
通用加载/保存方法
手动指定选项
Spark SQL的默认数据源为Parquet格式
spark.read.format("json").load
peopleDF.write.format("parquet").save
文件保存策略mode()
spark.sql("select * from json.'input/user.json' ")
JSON文件
Parquet文件
CSV文件
JDBC
Hive数据库
内嵌Hive使用
外部Hive应用
定义
特点
架构
文件数据源
文件数据流:能够读取所有HDFS API兼容的文件系统文件,通过fileStream方法进行读取
注意事项
val dirDS = ssc.textFileStream("in")
RDD队列
ssc.queueStream(queueOfRDDs)
用法
val rddQueue = new mutable.Queue[RDD[Int]]()
val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)
for (i <- 1 to 5) {
rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
Thread.sleep(2000)
}
从远程端口采集数据
自定义数据源
用法
var socket: Socket = _
def receive(): Unit = {
val reader: BufferedReader = new BufferedReader(
new InputStreamReader(
socket.getInputStream,
"UTF-8"
)
)
while (true) {
val str = reader.readLine()
if(str != null) {
store(str)
}
}
}
override def onStart(): Unit = {
socket = new Socket(host, port)
new Thread("Socket Receiver") {
setDaemon(true)
override def run() { receive() }
}.start()
}
override def onStop(): Unit = {
socket.close()
socket = null
}
}
Kafka数据源
在工程中需要引入 Maven 工件 spark- streaming-kafka_2.10 来使用它
用法
导入依赖
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.1</version>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.2</version>
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount1")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val kafkaPara: Map[String, Object] = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "test",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("test"), kafkaPara))
//5.将每条消息的KV取出
val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
//6.计算WordCount
valueDStream.flatMap(.split(" "))
.map((, 1))
.reduceByKey(_ + _)
.print()
//7.开启任务
ssc.start()
ssc.awaitTermination()
无状态
transform
//TODO code Driver(1)
re.transform(rdd => {
//TODO code Driver(n)
println("ssssssssssssssssssssss")
rdd.map(a => {
//TODO code Executor
a*2
})
}).print()
join
有状态
updateStateByKey
val re: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",8888)
re.flatMap(.split(" ")).map((,1))
.updateStateByKey[Long](
(seq : Seq[Int],op:Option[Long]) => {
Option(op.getOrElse(0L) + seq.sum)
}
)
- seq 相同key下 数量的集合 - 类似于缓冲区 buffer 需要落盘 提前声明 setCheckpointDir
window
re.flatMap(.split(" ")).map((,1)).window(Seconds(9),Seconds(6)).reduceByKey(+).print()
- 计算周期 == 滑动步长 Seconds(6)
countByWindow(windowLength, slideInterval)
reduceByWindow(func, windowLength, slideInterval)
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
print()
saveAsTextFiles
saveAsObjectFiles
saveAsHadoopFiles
foreachRDD(func)
这是最通用的输出操作 它用来对DStream中的RDD运行任意计算
常见的用例之一是把数据写到诸如MySQL的外部数据库中
注意
(1)内容
阶段ID
RDD元数据,依赖
分区
首选位置
(2)序列化(累加器,KRYO)
默认序列化:Java序列化
Kryo序列化:shuffle的场合下,如果类型为值类型或字符串类型
(3)调度
Driver
TaskSet - Stage
TaskSetManager - Scheduler
调度模式:FIFO(默认), FAIR
任务调度池:Pool(FIFO)
Executor
本地化级别(调度)
降级处理:默认等待3s
(4)计算
任务的编码
向Executor发送Task
Executor接收Task,并进行解码
计算对象执行Task
线程池执行task.run方法, 调用具体Task对象的runTask方法。
(5)Shuffle
Shuffle map(Write)
Shuffle reduce(Read)
管理器:SortShuffleManager
ShuffleWriter
BypassMergeSortShuffleWriter
UnsafeShuffleWriter
SortShuffleWriter
写磁盘文件时,首先进行内存中的排序,如果内存(5M)不够,会溢写磁盘,生成临时文件,最终将所有的临时文件合并成数据文件和索引文件。
预聚和的原理:在排序时,构造了一种类似于hashtable的结构,所以想同的key就聚合在一起。
ShuffleHandle
BypassMergeSortShuffleHandle: 没有预聚和功能的算子 & reduce的分区数量 <=阈值(200)
SerializedShuffleHandle :Spark的内存优化后的解决方案,对象序列化后不需要反序列化。!(支持序列化重定位 & 支持预聚和 & 分区数量 大于阈值)
BaseShuffleHandle :默认的shuffleHandle
(6)内存(cache)
静态内存管理
统一内存管理
(7)累加器
Spark 1.6 之前使用的是静态内存管理 (StaticMemoryManager) 机制,StaticMemoryManager 也是 Spark 1.6 之前唯一的内存管理器。
在 Spark1.6 之后引入了统一内存管理 (UnifiedMemoryManager) 机制,UnifiedMemoryManager 是 Spark 1.6 之后默认的内存管理器
1.6 之前采用的静态管理(StaticMemoryManager)方式仍被保留,可通过配置 spark.memory.useLegacyMode 参数启用
静态内存管理
统一内存管理
两者都有存在的意义 首先RDD是高容错的 计算失败 会根据血缘关系重新计算
由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition
由于重新计算体量太大 有时候影响性能,所以有了cache 持久化机制 可以将shuffle的数据 持久化到内存和磁盘等位置。
但是由于cache 是把数据持久化到内存和磁盘 可靠性并不高 所以有个checkpoint机制 可以支持数据持久化到可靠性高的外部存储 比如HDFS等 支持副本。
所以本质上
1.首先Spark Streaming默认job数是1 默认FIFO调度(容易产生如下问题)
Spark Streaming 的job生成是周期性的。当前job的执行时间超过生成周期就会产生job 累加。累加一定数目的job后有可能会导致应用程序失败。这个主要原因是由于FIFO的调度模式和Spark Streaming的默认单线程的job执行机制
2.因为有上面的问题 所以一般会选择FAIR调度模式 并且增加spark.streaming.concurrentJobs 个数
conf.set("spark.scheduler.mode", "FAIR")
运行的job就会均分所有executor提供的资源
3.因为Spark Streaming的任务存在Fair模式下并发的情况,所以需要在使用单例模式生成broadcast的时候要注意声明同步:
@volatile private var instance: Broadcast[Seq[String]] = null
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq("a", "b", "c")
instance = sc.broadcast(wordBlacklist)
}
}
}
instance
}
}
如果碰到是一个stage中 每个task数据量都很大 有可能是数据本地化的问题,因为task交给executor执行的时候 要看前面是不是有task执行 有的话默认等待3s 可以把3s 设置成0s 并且core数设置成task个数
本地化调度
本地化等待时长0 task = core
本地化等待时长6 task => core 2 3 倍数
宽窄依赖的算子
aggregate
reduce 同上
在使用完filter算子后,继续调用coalesce算子进行优化。
mapPartition 内部是迭代器 遍历转List
repartition shuffle
coaless no shuffle
reduceByKey(func)
foldByKey(zero)(func)
aggregateByKey(zero)(func1,func2)
combinerByKey(func1,func2,func3)
1.溢写不同机制的不同
2.MapReduce 进程级别的。 Spark 线程模型(减少了重新启动JVM进程带来的性能开销)
3.MapReduce shuffle的过程需要经历过多的排序。Spark 分情况排序
Broadcast Hash Join:
Hash join。 适合 很小的表 + 大表
将小表 广播出去 加载到内存(事实表和维度表的join处理) ===> 小表的边界是10M
再在每个executor上执行单机版hash join,小表映射,大表试探;
Shuffle Hash Join:
Hash join。 适合 小表 + 小表 或者 小表 + 大表
如果表过大 再广播会对driver端和executor端造成较大的压力。 那么可以分而治之 两个表中,key相同的行都会被shuffle到同一个分区中 分别进行Hash Join。
Sort Merge Join:
适合大表 + 大表
如果两个表都很大 那么不适合加载到内存。
那么可以先shuffle 让两张表数据分布到整个集群
再对两个表分别进行排序
分别遍历两个有序序列 碰到相同的join key 就合并 否则取更小的一边。
Spark SQL是如何选择join策略的?
广播变量 共享读
累加器 共享写
累加器原理
driver 端定义 序列化 => executor端 反序列化 => task执行结果 => driver端 汇总
弊端
map执行中内存溢出
shuffle后内存溢出
OOM的问题通常出现在execution这块内存中,因为storage这块内存在存放数据满了之后,会直接丢弃内存中旧的数据,对性能有影响但是不会有OOM的问题
解决
大规模 无边界 乱序的数据集 ==> 解决这种问题的一种数据模型
无论是流式计算/微批次或是批处理,它们要处理的问题都可以抽象为以下几个问题:
What: 需要计算的结果数据是什么
Where: 计算的上下文环境是什么
When: 什么时候计算输出结果
How: 如何修正早期计算结果
dataflow 的思想 => spark streaming 微批次处理 (这段时间给flink 发家的机会) => spark 2.0 版本 结构化流(统一批流 api
但是实质上 还是周期性 + 微批次处理 100hm级别 checkpoint + 预写日志 --> 精准一次性) => spark 2.3 版本 结构化流 连续模式 实验性的发布 真正利用 dataflow思想 lamdba batch view ,real time view 和 query view 的思想 利用触发器 + checkpoint 间隔时间
连续处理 **秒 记录查询的进度 触发器再次启动查询(相当于一边实时记录查询的进度 一边触发启动任务的查询)
Structured Streaming
是一个基于Spark SQL引擎的可扩展、容错的流处理引擎。统一了流、批的编程模型,你可以使用静态数据批处理一样的方式来编写流式计算操作。并且支持基于event_time的时间窗口的处理逻辑
(Spark Streaming) VS (Structured Streaming)
Spark Streaming 不足
Structured Streaming 优势
版本特性
spark 2.0 版本
合并dataframe和datasets
新增结构化流
干掉了 HashShuffle 加入了BypassShuffle
新增高级函数
spark 2.2 版本
spark 2.3 版本
spark 2.4 版本
spark 3.0 版本
自适应查询优化
动态分区修剪
动态分区裁剪就是基于运行时(run time)推断出来的信息来进一步进行分区裁剪
要求都是分区表 并且字段在on条件内 还需要spark.sql.optimizer.dynamicPartitionPruning.enabled 设置为true
还需要另外两个参数达到一定的条件 才支持分区修剪
静态分区裁剪
支持 Hadoop 3.x
结构化流式用户界面
未经优化的HashShuffleManager
优化后的HashShuffleManager
SortShuffleManager(普通运行机制)
shuffle 溢写条件
elementsRead % 32 == 0 && currentMemory >= 5M
reduceByKey这种预聚合的算子 ==> 选用Map进行聚合 ==> 写入内存
join 这种普通的shuffle算子 ==> 选用Array ==> 写入内存
溢写前 ==> 排序 ==> 分批次(10000条) 写入磁盘
写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。
task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。
bypass运行机制
SQL 解析阶段 - SparkSqlParser
绑定逻辑计划阶段 - Analyzer
优化逻辑计划阶段 - Optimizer
生成可执行的物理计划阶段 - SparkPlanner
全阶段代码生成阶段 - WholeStageCodegen
代码编译
SQL 执行
catalyst是sql的一个解析器引擎
Catalyst工作流程
Spark内存均特指Executor的内存
Spark内存“规划式”管理:对象实例占用内存的申请和释放都由JVM完成,Spark只能在申请后和释放前记录这些内存
申请内存的过程
释放内存的过程
堆内内存 分为几个部分组成
为什么设置300M预留内存?
堆外内存
内存管理模式:静态 统一
内存管理
存储内存
RDD持久化机制
依赖于块管理器:BlockManager
BlockManagerMaster 负责存储元数据信息 协调各个BlockManager的工作
BlockManager 负责数据存储和传递
shuffle write的时候 触发BlockManager写操作 先写内存 再写磁盘() 如果有副本 则进行同步
shuffle read的时候 先读本地 如果没有再从别的BlockManager 读取
RDD缓存
执行内存
令牌桶原理。动态调节数据接收速率的功能
有Receiver时反压原理
没有Receiver时反压原理
最优资源配置
RDD优化
并行度调节
广播大变量
Kryo序列化
调节本地化等待时长
数据倾斜发生时的现象?
如何定位导致数据倾斜的代码?
如何查看导致数据倾斜的key的数据分布情况?
如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下SQL中使用的表的key分布情况。
如果是对Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码
数据倾斜的解决方案
一:上游是hive的情况
使用Hive ETL预处理数据
把数据倾斜的问题交给上游处理
优点
缺点
实践经验
二:极少的key造成的数据倾斜 并且这些key无关紧要的情况
过滤少数导致倾斜的key
优点
缺点
实践经验
三:处理数据倾斜最简单的一种方案
提高shuffle操作的并行度
优点
缺点
实践经验
四:聚合类的shuffle操作引发的数据倾斜
加盐+两阶段聚合(局部聚合+全局聚合)
// 第一步,给RDD中的每个key都打上一个随机前缀。
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(
new PairFunction<Tuple2<Long,Long>, String, Long>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Long> call(Tuple2<Long, Long> tuple) throws Exception { Random random = new Random(); int prefix = random.nextInt(10); return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2); } });
// 第二步,对打上随机前缀的key进行局部聚合。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() { private static final long serialVersionUID = 1L; @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } });
// 第三步,去除RDD中每个key的随机前缀。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
new PairFunction<Tuple2<String,Long>, Long, Long>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Long, Long> call(Tuple2<String, Long> tuple) throws Exception { long originalKey = Long.valueOf(tuple._1.split("_")[1]); return new Tuple2<Long, Long>(originalKey, tuple._2); } });
// 第四步,对去除了随机前缀的RDD进行全局聚合。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() { private static final long serialVersionUID = 1L; @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } });
五:join操作 小表和大表
将reduce join转为map join
// 首先将数据量比较小的RDD的数据,collect到Driver中来。
List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()
// 然后使用Spark的广播功能,将小RDD的数据转换成广播变量,这样每个Executor就只有一份RDD的数据。
// 可以尽可能节省内存空间,并且减少网络传输性能开销。
final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);
// 对另外一个RDD执行map类操作,而不再是join类操作。
JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple) throws Exception { // 在算子函数中,通过广播变量,获取到本地Executor中的rdd1数据。 List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value(); // 可以将rdd1的数据转换为一个Map,便于后面进行join操作。 Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>(); for(Tuple2<Long, Row> data : rdd1Data) { rdd1DataMap.put(data._1, data._2); } // 获取当前RDD数据的key以及value。 String key = tuple._1; String value = tuple._2; // 从rdd1数据Map中,根据key获取到可以join到的数据。 Row rdd1Value = rdd1DataMap.get(key); return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value)); } });
// 这里得提示一下。
// 上面的做法,仅仅适用于rdd1中的key没有重复,全部是唯一的场景。
// 如果rdd1中有多个相同的key,那么就得用flatMap类的操作,
// 在进行join的时候不能用map,而是得遍历rdd1所有数据进行join。
// rdd2中每条数据都可能会返回多条join后的数据。
六:join操作 大表和大表(数据倾斜key不多的情况)
采样倾斜key并拆分join操作
// 首先从包含了少数几个导致数据倾斜key的rdd1中,采样10%的样本数据。
JavaPairRDD<Long, String> sampledRDD = rdd1.sample(false, 0.1);
// 对样本数据RDD统计出每个key的出现次数,并按出现次数降序排序。
// 对降序排序后的数据,取出top 1或者top 100的数据,也就是key最多的前n个数据。
// 具体取出多少个数据量最多的key,由大家自己决定,我们这里就取1个作为示范。
JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
new PairFunction<Tuple2<Long,String>, Long, Long>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Long, Long> call(Tuple2<Long, String> tuple) throws Exception { return new Tuple2<Long, Long>(tuple._1, 1L); } });
JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(
new Function2<Long, Long, Long>() { private static final long serialVersionUID = 1L; @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } });
JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair(
new PairFunction<Tuple2<Long,Long>, Long, Long>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple) throws Exception { return new Tuple2<Long, Long>(tuple._2, tuple._1); } });
final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;
// 从rdd1中分拆出导致数据倾斜的key,形成独立的RDD。
JavaPairRDD<Long, String> skewedRDD = rdd1.filter(
new Function<Tuple2<Long,String>, Boolean>() { private static final long serialVersionUID = 1L; @Override public Boolean call(Tuple2<Long, String> tuple) throws Exception { return tuple._1.equals(skewedUserid); } });
// 从rdd1中分拆出不导致数据倾斜的普通key,形成独立的RDD。
JavaPairRDD<Long, String> commonRDD = rdd1.filter(
new Function<Tuple2<Long,String>, Boolean>() { private static final long serialVersionUID = 1L; @Override public Boolean call(Tuple2<Long, String> tuple) throws Exception { return !tuple._1.equals(skewedUserid); } });
// rdd2,就是那个所有key的分布相对较为均匀的rdd。
// 这里将rdd2中,前面获取到的key对应的数据,过滤出来,分拆成单独的rdd,
// 并对rdd中的数据使用flatMap算子都扩容100倍。
// 对扩容的每条数据,都打上0~100的前缀。
JavaPairRDD<String, Row> skewedRdd2 = rdd2.filter(
new Function<Tuple2<Long,Row>, Boolean>() { private static final long serialVersionUID = 1L; @Override public Boolean call(Tuple2<Long, Row> tuple) throws Exception { return tuple._1.equals(skewedUserid); } }).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() { private static final long serialVersionUID = 1L; @Override public Iterable<Tuple2<String, Row>> call( Tuple2<Long, Row> tuple) throws Exception { Random random = new Random(); List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>(); for(int i = 0; i < 100; i++) { list.add(new Tuple2<String, Row>(i + "_" + tuple._1, tuple._2)); } return list; } });
// 将rdd1中分拆出来的导致倾斜的key的独立rdd,每条数据都打上100以内的随机前缀。
// 然后将这个rdd1中分拆出来的独立rdd,与上面rdd2中分拆出来的独立rdd,进行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(
new PairFunction<Tuple2<Long,String>, String, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call(Tuple2<Long, String> tuple) throws Exception { Random random = new Random(); int prefix = random.nextInt(100); return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2); } }) .join(skewedUserid2infoRDD) .mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Long, Tuple2<String, Row>> call( Tuple2<String, Tuple2<String, Row>> tuple) throws Exception { long key = Long.valueOf(tuple._1.split("_")[1]); return new Tuple2<Long, Tuple2<String, Row>>(key, tuple._2); } });
// 将rdd1中分拆出来的包含普通key的独立rdd,直接与rdd2进行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(rdd2);
// 将倾斜key join后的结果与普通key join后的结果,uinon起来。
// 就是最终的join结果。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);
七:RDD中有大量的key导致数据倾斜
使用随机前缀和扩容RDD进行join
// 首先将其中一个key分布相对较为均匀的RDD膨胀100倍。
JavaPairRDD<String, Row> expandedRDD = rdd1.flatMapToPair(
new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() { private static final long serialVersionUID = 1L; @Override public Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple) throws Exception { List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>(); for(int i = 0; i < 100; i++) { list.add(new Tuple2<String, Row>(0 + "_" + tuple._1, tuple._2)); } return list; } });
// 其次,将另一个有数据倾斜key的RDD,每条数据都打上100以内的随机前缀。
JavaPairRDD<String, String> mappedRDD = rdd2.mapToPair(
new PairFunction<Tuple2<Long,String>, String, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call(Tuple2<Long, String> tuple) throws Exception { Random random = new Random(); int prefix = random.nextInt(100); return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2); } });
// 将两个处理后的RDD进行join即可。
JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);
类似于方案六 只是不拆分
缺点
XMind: ZEN - Trial Version