本文主要是介绍sparkstreaming转换算子--窗口函数,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
window
- 画图理解
- 说明
countByWindow 对每个滑动窗口的数据执行count操作
reduceByWindow 对每个滑动窗口的数据执行reduce操作
reduceByKeyAndWindow 对每个滑动窗口的数据执行reduceByKey操作
countByValueAndWindow 对每个滑动窗口的数据执行countByValue操作
都需要传入两个核心参数
windowDuration: Duration, 窗口时间长度--一般是batchSize(批次时间)的整数倍
slideDuration: Duration: 滑动时间长度----一般是batchSize(批次时间)的整数倍
- 案例
package SparkStreaming.trans
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
/**
* 滑动窗口算子
* 每隔一段时间,对原始DStream中多个批次数据整合 成为新的DStream中一个批次数据
*
* Spark Streaming中,一个批次执行一次,不会积攒当前批次的数据。滑动窗口算子可以实现将多个批次数据积攒下来,然后再去做统一的运算
* 窗口算子最为基础核心的算子 window 会给我们返回一个新的DStream,但是这个DStream包含多个未被处理的批次数据
* 窗口函数中需要传递核心参数
* windowDuration: Duration, 窗口时间长度--一般是batchSize(批次时间)的整数倍
* slideDuration: Duration: 滑动时间长度----一般是batchSize(批次时间)的整数倍
*/object ByWindow {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[3]").setAppName("transform3")
val ssc = new StreamingContext(conf, Milliseconds(10000))
val ds: DStream[String] = ssc.socketTextStream("node1", 44444, StorageLevel.MEMORY_ONLY)
// 窗口长度 30s 滑动间隔 10s 每个10s时间将DStream中前30秒的数据 整合为一个批次数据处理
val ds1 = ds.window(Seconds(10), Seconds(10))
ds1.print()
ssc.start()
ssc.awaitTermination()
}
}
package SparkStreaming.trans
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Milliseconds, Minutes, Seconds, StreamingContext}
/**
* 使用window窗口函数算子 严格意义上只负责去对原始DStream进行窗口检测,形成窗口批次数据的DStream,如果我们要对窗口批次数据
* 进行处理的话,还得需要对窗口批次数据的DStream使用转换算子和行动算子计算逻辑
*
* windows函数也有一些变种的窗口函数算子:既可以实现窗口批次数据的检测,也可以实现一些相关的计算功能
* countByWindow 对每个滑动窗口的数据执行count操作
* reduceByWindow 对每个滑动窗口的数据执行reduce操作
* reduceByKeyAndWindow 对每个滑动窗口的数据执行reduceByKey操作
* countByValueAndWindow 对每个滑动窗口的数据执行countByValue操作
*/
object ByWindow2 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("state02").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Milliseconds(10000))
ssc.checkpoint("hdfs://node1:9000/sparkstreaming")
val ds:DStream[String] = ssc.socketTextStream("node1", 44444)
val ds1 = ds.map((_, 1))
val ds2 = ds1.reduceByKeyAndWindow((a: Int, b: Int)=>(a+b), Seconds(10), Seconds(10))
ds2.print()
println(",,,,,,,,,,")
val ds3: DStream[Long] = ds1.countByWindow(Seconds(10), Seconds(10))
ds3.print()
println(",,,,,,,,,,")
val ds4 = ds1.reduceByWindow((a, b) => (a._1+b._1, 0), Seconds(10), Seconds(10))
ds4.print()
println(",,,,,,,,,,")
// 需要设置检查点
val ds5 = ds1.countByValueAndWindow(Seconds(10), Seconds(10))
ds5.print()
ssc.start()
ssc.awaitTermination()
}
}
应用场景:黑名单
package SparkStreaming.trans
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Milliseconds, Minutes, Seconds, StreamingContext}
/**
* 黑名单统计
* 有市场就有竞争,有竞争就少不来邪门外道
* A厂家投放了广告,广告每点击一次都是有记录的,但是不排初竞争对手的恶意点击
*
* 实时统计黑名单用户
* 网站每隔3秒记录一批次用户的点击行为,记录的时候,认定如果在1分钟之内 用户点击次数超过10次 认定这个用户是一个黑名单用户
* 需要把用户IP封掉
*
* Spark Streaming去连接端口数据源:
* 端口模拟用户的点击行为 发送数字 数字就代表某一个用户id
*/
object BlackUser {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("state01").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Milliseconds(3000))
val ds:DStream[String] = ssc.socketTextStream("node1", 44444)
val ds1:DStream[String] = ds.window(Minutes(1),Minutes(1))
val ds2:DStream[(String,Int)] = ds1.map((_, 1)).reduceByKey(_ + _)
//保留黑名单用户
val ds3:DStream[(String,Int)] = ds2.filter(tuple=>{
if(tuple._2>=10){
true
}else{
false
}
})
ds3.print()
ssc.start()
ssc.awaitTermination()
}
}
这篇关于sparkstreaming转换算子--窗口函数的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!