作者:韩信子@ShowMeAI
教程地址:http://www.showmeai.tech/tutorials/84
本文地址:http://www.showmeai.tech/article-detail/179
声明:版权所有,转载请联系平台与作者并注明出处
Spark Streaming是Spark核心API的一个扩展,可以实现实时数据的可拓展,高吞吐量,容错机制的实时流处理框架。
Spark Streaming 支持的数据输入源很多,例如:Kafka、 Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。另外Spark Streaming 也能和 MLlib(机器学习)以及 Graphx 完美融合。
和 Spark 基于 RDD 的概念很相似,Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而DStream 是由这些RDD 所组成的序列(因此得名“离散化”)。
定义一个RDD处理逻辑,数据按照时间切片,每次流入的数据都不一样,但是RDD的DAG逻辑是一样的,即按照时间划分成一个个batch,用同一个逻辑处理。
DStream 可以从各种输入源创建,比如 Flume、Kafka 或者 HDFS。创建出来的 DStream 支持两种操作,一种是转化操作(transformation),会生成一个新的DStream,另一种是输出操作(output operation),可以把数据写入外部系统中。DStream 提供了许多与 RDD 所支持的操作相类似的操作支持,还增加了与时间相关的新操作,比如滑动窗口。
Spark Streaming有下述一些特点:
易用:Spark Streaming支持Java、Python、Scala等编程语言,可以像编写离线程序一样编写实时计算的程序求照的器。
容错:Spark Streaming在没有额外代码和配置的情况下,可以恢复丢失的数据。对于实时计算来说,容错性至关重要。首先要明确一下Spak中RDD的容错机制,即每一个RDD都是个不可变的分布式可重算的数据集,它记录着确定性的操作继承关系(lineage),所以只要输入数据是可容错的,那么任意一个RDD的分区(Partition)出错或不可用,都可以使用原始输入数据经过转换操作重新计算得到。
易整合到Spark体系中:Spark Streaming可以在Spark上运行,并且还允许重复使用相同的代码进行批处理。也就是说,实时处理可以与离线处理相结合,实现交互式的查询操作。
大家知道Spark的工作机制如下:
而SparkStreaming架构由三个模块组成:
在上图中几个核心的角色和功能分别是:
Master:记录Dstream之间的依赖关系或者血缘关系,并负责任务调度以生成新的RD
Worker:
Client:负责向Spark Streaming中灌入数据(flume kafka)
Spark Sreaming的作业提交包含的组件和功能分别为:
具体的作业提交流程如下:
要传入的数据会编排成block id(元数据)的形式,再加上RDD的逻辑,就生产了job scheduler,通过job manager形成job queue,以队列形式有序执行。真正的数据是以block形式传入worker,由worker上的executor通过元数据信息Block ID去HDFS上拉取对应的block数据进行执行。
Network Input Tracker传入的并不是真正的数据,而是Block IDs,相当于获取的是元数据,数据是通过worker进行接受的,也就是说Master上不管真正数据的接受情况,Master上只是能够拿到数据block的id,至于这些block做什么操作,是会放到 Job Manager去,按照顺序执行。
Discretized Stream 是Spark Streaming 的基础抽象,代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。在内部实现上,DStream 是一系列连续的RDD 来表示。每个RDD 含有一段时间间隔内的数据。
简单来说,SparkStreaming接受实时的数据流,把数据按照指定的时间段切成一片片小的数据块(SparkStreaming将每个小的数据块当作RDD来处理),然后把数据块传给Spark Engine处理,最终得到一批批的结果。
对于Streaming来说,它的单位是DStream,而对于SparkCore,它的单位是RDD。针对Spark开发,就是开发RDD的DAG图,而针对SparkStreaming,就是开发DStream。
DStream 代表连续的一组RDD,每个RDD都包含特定时间间隔的数据。DStream内部的操作,可以直接映射到内部RDD进行,相当于DStream是在RDD上增加一个时间的维度得到的。RDD是DStream最小的一个数据单元。DStream中对数据的操作也是按照RDD为单位来进行的。
简单来理解,SparkStreaming对于流数据的处理速度是秒级别,无法达到Storm的毫秒级别,因此也可以将Streaming看作是微批处理。
大家在上文中频繁看到Dstream的核心概念,下面我们对其做一些展开讲解。
整体上看,Spark Streaming 的处理思路:将连续的数据持久化、离散化,然后进行批量处。
对上面这句话进行分析:
Spark Streaming 原生支持一些不同的数据源。一些“核心”数据源已经被打包到 Spark Streaming 的 Maven 工件中,而其他的一些则可以通过 spark-streaming-kafka 等附加工件获取。每个接收器都以 Spark 执行器程序中一个长期运行的任务的形式运行,因此会占据分配给应用的 CPU 核心。
此外,我们还需要有可用的 CPU 核心来处理数据。这意味着如果要运行多个接收器,就必须至少有和接收器数目相同的核心数,还要加上用来完成计算所需要的核心数。例如,如果我们想要在流计算应用中运行 10 个接收器,那么至少需要为应用分配 11 个 CPU 核心。所以如果在本地模式运行,不要使用local 或者 local。
DStream 上的原语与 RDD 的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window 相关的原语。
① TransFormation
② Output
DStream = [rdd1, rdd2, …, rddn]
RDD两类算子:transformation、action
DStream两类算子:transformation、output
无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。部分无状态转化操作列在了下表中。注意,针对键值对的 DStream 转化操作(比如 reduceByKey())要添加 import StreamingContext._才能在 Scala 中使用。
函数名称 | 目的 | Scala示例 | 用来操作 DStream[T] 的用户自定义函数的函数签名 |
---|---|---|---|
map ( ) | 对 DStream 中的每个元素应用给定函数,返回由各元素输出的元素组成的DStream | ds.map(x => x + 1) | f : (T) -> U |
flatMap( ) | 对 DStream 中的每个元素应用给定函数,返回由各元素输出的迭代器组成的 DStream | ds.flatMap(x => x.split (“ ”) ) | f : T -> Iterable [U] |
filter( ) | 返回由给定 DStream 中通过筛选的元素组成的 DStream | ds.filter(x => x! = 1 ) | f : T -> Boolean |
repartition( ) | 改变 DStream 的分区数 | ds.repartition(10 ) | N / A |
reduceByKey( ) | 将每个批次中键相同的记录归约 | ds.reduceByKey((x, y) => x + y) | f : T , T -> T |
groupByKey( ) | 将每个批次中的记录根据键分组 | ds.groupByKey( ) | N / A |
需要注意的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream 在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个 RDD 上的。例如,reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。
无状态转化操作也能在多个 DStream 间整合数据,不过也是在各个时间区间内。例如,键 值对DStream 拥有和RDD 一样的与连接相关的转化操作,也就是cogroup()、join()、leftOuterJoin() 等。我们可以在DStream 上使用这些操作,这样就对每个批次分别执行了对应的RDD 操作。
我们还可以像在常规的 Spark 中一样使用 DStream 的 union() 操作将它和另一个 DStream 的内容合并起来,也可以使用 StreamingContext.union()来合并多个流。
① UpdateStateByKey (全局统计量)
UpdateStateByKey 原语用于记录历史记录,有时,我们需要在DStream 中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的DStream。
给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。
为使用这个功能,你需要做下面两步:
如果要使用updateStateByKey算子,就必须设置一个checkpoint目录,开启checkpoint机制,这样的话才能把每个key对应的state除了在内存中有,在磁盘上也checkpoint一份。因为要长期保存一份key的state的话,那么spark streaming是要求必须用checkpoint的,以避免内存数据的丢失。
主要解决:比如说在双十一统计一天销量和成交金额,这些计算需要全量汇总,对数据进行累加,就需要避免数据在内存中丢失,造成不准确。
② Window Operations
Window Operations 有点类似于 Storm 中的 State,可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming 的允许状态。
基于窗口的操作会在一个比 StreamingContext 的批次间隔更长的时间范围内,通过整合多个批次(在窗口内的批次)的结果,计算出整个窗口的结果。
简单来说,Streaming的Window Operations是Spark提供的一组窗口操作,通过滑动窗口的技术,对大规模数据的增量更新进行统计分析,即定时进行一段时间内的数据处理。
所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长,两者都必须是 StreamContext 的批次间隔的整数倍。
窗口时长控制每次计算最近的多少个批次的数据,其实就是最近的 windowDuration/batchInterval 个批次。如果有一个以 10 秒为批次间隔的源 DStream,要创建一个最近 30 秒的时间窗口(即最近 3 个批次),就应当把 windowDuration 设为 30 秒。而滑动步长的默认值与批次间隔相等,用来控制对新的 DStream 进行计算的间隔。如果源 DStream 批次间隔为 10 秒,并且我们只希望每两个批次计算一次窗口结果, 就应该把滑动步长设置为 20 秒。
滑动窗口的长度必须是滑动时间间隔的整数倍。因为RDD是DStream上最小的数据单元不可切分。如果不是整数倍,会出现一个RDD被切分的情况,程序会报错。
DStream Graph是一系列transformation操作的抽象,例如:
c = a.join(b), d = c.filter() 时, 它们的 DAG 逻辑关系是a/b → c,c → d,但在 Spark Streaming 在进行物理记录时却是反向的 a/b ← c, c ← d, 目的是为了追溯。
Dstream之间的转换所形成的的依赖关系全部保存在DStreamGraph中, DStreamGraph对于后期生成RDD Graph至关重要。
DStreamGraph有点像简洁版的DAG scheduler,负责根据某个时间间隔生成一序列 JobSet,以及按照依赖关系序列化
代码是一直在跑的,每隔一定时间就会形成一个RDD。
DStream.map(RDD => RDD.map)
随看时间的流逝,基于 Dstream Graph不断的生成 RDD Graph也就是DAG的方式产生Job,并通过 Jobscheduler 的线程池提交给 SparkCluster不断的执行。
每个时间间隔会积累一定的数据,这些数据可以看成由 event 组成(假设以 kafka 或者Flume为例),时间间隔是固定的,在时间间隔内的数据就是固定的。也就是RDD是由一个时间间隔内所有数据构成。时间维度的不同,导致每次处理的数据量及内容不同。
我们先来看一看一个简单的 Spark Streaming 程序的样子。假如我们想要计算从一个监听 TCP socket 的数据服务器接收到的文本数据(text data)中的字数,我们可以按照如下步骤进行:
① 首先, 我们导入StreamingContext,这是所有流功能的主要入口点。 我们创建了一个带有 2 个执行线程和间歇时间为 1 秒的本地 StreamingContext。
from pyspark import SparkContext from pyspark.streaming import StreamingContext # 创建一个具有两个工作线程(working thread)并且批次间隔为 1 秒的本地 StreamingContext . sc = SparkContext("local[2]", "NetworkWordCount") ssc = StreamingContext(sc, 1)
② 使用该 context,我们创建一个代表从 TCP 源流数据的DStream,指定主机名(例如 localhost)和端口(例如 9999)。
# 创建一个将要连接到 hostname:port 的 DStream,如 localhost:9999 lines = ssc.socketTextStream("localhost", 9999)
③ 上述lines DStream 表示将要从数据服务器接收到的数据流。在这个离散流(DStream)中的每一条记录都是一行文本(text)。接下来,我们希望通过空格字符拆分这些数据,把每一行切分为单词。
# 将每一行拆分成单词 words = lines.flatMap(lambda line: line.split(" "))
④ flatMap 是一种一对多的DStream操作,它会通过在源DStream中根据每个记录生成多个新纪录的形式创建一个新的DStream。在这种情况下,每一行都将被拆分成多个单词和代表单词DStream的单词流。下一步,我们想要计算这些单词:
# 计算每一个 batch(批次)中的每一个 word(单词) pairs = words.map(lambda word: (word, 1)) wordCounts = pairs.reduceByKey(lambda x, y: x + y) # 在控制台打印出在这个DStream中生成的每个 RDD 的前十个元素 wordCounts.print()
上述单词DStream进行了进一步的映射(一对一的转换)为一个 (word, 1) paris 的DStream,这个 DStream 然后被reduce来获得数据中每个批次的单词频率。最后,wordCounts.print() 将会打印一些每秒生成的统计结果。
⑤ 注意当这些行被执行的时候, Spark Streaming 仅仅设置了计算,只有在启动时才会执行,并没有开始真正地处理。为了在所有的转换都已经设置好之后开始处理,我们在最后调用:
ssc.start() # Start the computation ssc.awaitTermination() # Wait for the computation to terminate
该部分完整的代码可以在 Spark Streaming 示例 NetworkWordCount 中找到。
如果你已经 下载 并且 构建 Spark, 您可以使用如下方式来运行该示例. 你首先需要运行 Netcat(一个在大多数类 Unix 系统中的小工具)作为我们使用的数据服务器。
$ nc -lk 9999
然后,在另一个不同的终端,你可以通过执行如下命令来运行该示例:
$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
然后,在运行在 netcat 服务器上的终端输入的任何行(lines),都将被计算,并且每一秒都显示在屏幕上,它看起来就像下面这样:
# TERMINAL 1: # Running Netcat $ nc -lk 9999 hello world # TERMINAL 2: RUNNING network_wordcount.py $ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999 ... ------------------------------------------- Time: 2014-10-14 15:25:21 ------------------------------------------- (hello,1) (world,1) ...