DStream 操作和RDD的操作很类似,分为 Transformations 转换和 Output Operations 输出两种,此外转换操作中还有一些比较复杂的算子,如:updateStateByKey(),transform()以及各种 Window 相关算子。
无状态转换操作就是把简单的RDD转换操作应用到每个批次上,也就是转换DStream上的每一个RDD,下表是部分无状态转换操作算子:
需要注意:尽管这些算子看起来像作用在整个流上一样,但事实是每个DStream在内部都是由许多的RDD(批次)组成,且无状态转换操作时分别应用到每个RDD上的。例如:reduceBykey() 会规约每个时间区间的数据,但不会规约不同区间之间的数据。在Streaming wordCount 输出结果可以看出,我们只会统计采集周期内的单词个数,而不是跨采集周期进行累加。
Transform 是一个比较特殊的无状态转换算子。它允许DStream 上执行任意的 RDD-to-RDD h函数,即使函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API,该函数每一批次调度一次,Transform其本质是对DStream中的RDD进行转换。
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaing") //StreamingContext 两个参数 sparkConf 配置文件 Seconds(3) 微批采集周期 val ssc = new StreamingContext(sparkConf, Seconds(3)) val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop103", 9999) /** * transform 可以获取底层RDD进行操作 * Driver 端(周期性执行) * 应用场景 DStream 功能不完善 需要代码周期性执行 */ val sordRDD: DStream[(String, Int)] = lines.transform( // transform 将 DStream 转换为 RDD进行一些列操作,再返回为 DStream rdd => { rdd.flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) .sortByKey() } ) sordRDD.print() // 启动采集器 ssc.start() //等待采集器关闭 ssc.awaitTermination() }
UpdateStateByKey 算子应用于将历史结果应用月当前批次,才操作允许在使用新的信息不断更新撞他的同事保留他的状态。有时需要在DStream中跨批次维护状态*(例如跨批次累加wordCOunt),针对这种情况,updateStateByKey() 为我们提供了对一个状态变量的访问,用于键值对形式的DStream,给定一个有键 事件构成的 DStream,并传递一个指定如何根据新的事件更新每一个键对应状态的函数,他可以构建出一个新的DStream,其内部为键 状态对UpdateStateByKey的结果会是一个新的DStream,其内部的RDD是由每个时间区间的的对应键值对组成。为了是由UpdateStateByKey,需要做一下两步处理
说明:是由updateStateBykey 需要设置检查点来保存数据状态。
wordcount
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaing") //StreamingContext 两个参数 sparkConf 配置文件 Seconds(3) 微批采集周期 val ssc = new StreamingContext(sparkConf, Seconds(3)) //创建检查点目录,用于保存状态 ssc.checkpoint("E:\\workspace_idea\\wdh01-clases\\data") val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop103", 9999) val mapDS: DStream[(String, Int)] = lines.flatMap(_.split(" ")) .map((_, 1)) /** * updateStateByKey 有状态聚合操作 * 记录上个采集周期的状态,和当期采集周期数据进行聚合 */ val resDS: DStream[(String, Int)] = mapDS.updateStateByKey( (seq: Seq[Int], state: Option[Int]) => { //Option[Int] Int 是累加数据度量值的类型
测试结果
------------------------------------------- Time: 1650428595000 ms ------------------------------------------- (spark,1) (hello,1) ------------------------------------------- Time: 1650428598000 ms ------------------------------------------- (spark,1) (hello,1) ------------------------------------------- Time: 1650428601000 ms ------------------------------------------- (spark,1) (hello,1) ------------------------------------------- Time: 1650428604000 ms ------------------------------------------- (spark,1) (hello,1) (sprk,1) ------------------------------------------- Time: 1650428607000 ms ------------------------------------------- (spark,1) (hello,1) (sprk,1)
可以看到,新的采集周期的输出附带了历史采集周期的数据的数据结果,
Spark Streaming 提供了窗口计算。允许执行转换操作使其作用于所在一个窗口内的数据。默认情况下,计算只对时间内的RDD机型,有了窗口之后,可以把计算应用到一个指定的窗口内所有的RDD上。一个窗口可以包含多个时间段,基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。所有基于窗口的操作需要两个参数,窗口时长&滑动步长。
说明:这两个参数必须是采集周期的整数倍。如下图所示,wordCOunt 案例,窗口大小是批次的2倍,滑动步长=批次大小。
例如:一小时人流量的变化,窗口(6秒)和间隔(3秒)不一致,不一定从程序启动开始需求:WordCount统计 3秒一个批次,窗口6秒,滑步3秒。
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaing") //StreamingContext 两个参数 sparkConf 配置文件 Seconds(3) 微批采集周期 val ssc = new StreamingContext(sparkConf, Seconds(3)) val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop103", 9999) //窗口范围是采集周期的整数倍 //窗口是可以滑动的,默认情况下 一个采集周期进行滑动 //这种情况可能会出现重复数据,为了避免这种情况可以适度改变滑动幅度,滑动步长大于等于窗口大小 //对一个是窗口大小 第二个滑动步长 lines.window(Seconds(6), Seconds(3)) .flatMap(_.split(" ")) .map((_,1)) .reduceByKey(_+_) .print() // 启动采集器 ssc.start() //等待采集器关闭 ssc.awaitTermination() }