相信用过MySQL的朋友都知道,MySQL中也有开窗函数的存在。开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。
开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用 GROUP BY 子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。
聚合函数是将多行变成一行,count,avg…
开窗函数是将一行变成多行;
聚合函数如果要显示其他的列必须将列加入到group by中
开窗函数可以不使用group by,直接将所有信息显示出来
聚合函数(列) OVER(选项),这里的选项可以是PARTITION BY 子句,但不可以是 ORDER BY 子句。
排序函数(列) OVER(选项),这里的选项可以是ORDER BY 子句,也可以是OVER(PARTITION BY 子句 ORDER BY 子句),但不可以是 PARTITION BY 子句。
进入到SparkShell命令行
/export/servers/spark/bin/spark-shell --master spark://node01:7077,node02:7077
创建一个样例类,用于封装数据
case class Score(name: String, clazz: Int, score: Int)
创建一个RDD数组,造一些数据,并调用toDF
方法将其转换成DataFrame
val scoreDF = spark.sparkContext.makeRDD(Array( Score("a1", 1, 80), Score("a2", 1, 78), Score("a3", 1, 95), Score("a4", 2, 74), Score("a5", 2, 92), Score("a6", 3, 99), Score("a7", 3, 99), Score("a8", 3, 45), Score("a9", 3, 55), Score("a10", 3, 78), Score("a11", 3, 100)) ).toDF("name", "class", "score")
创建一个临时表
scoreDF.createOrReplaceTempView("scores")
查询所有数据
scoreDF.show() +----+-----+-----+ |name|class|score| +----+-----+-----+ | a1| 1| 80| | a2| 1| 78| | a3| 1| 95| | a4| 2| 74| | a5| 2| 92| | a6| 3| 99| | a7| 3| 99| | a8| 3| 45| | a9| 3| 55| | a10| 3| 78| | a11| 3| 100| +----+-----+-----+
OVER 关键字表示把聚合函数当成聚合开窗函数而不是聚合函数。
SQL标准允许将所有聚合函数用做聚合开窗函数。
spark.sql("select count(name) from scores").show
spark.sql("select name, class, score, count(name) over() name_count from scores").show
查询结果如下所示:
查询结果如下所示: +----+-----+-----+----------+ |name|class|score|name_count| +----+-----+-----+----------+ | a1| 1| 80| 11| | a2| 1| 78| 11| | a3| 1| 95| 11| | a4| 2| 74| 11| | a5| 2| 92| 11| | a6| 3| 99| 11| | a7| 3| 99| 11| | a8| 3| 45| 11| | a9| 3| 55| 11| | a10| 3| 78| 11| | a11| 3| 100| 11| +----+-----+-----+----------+
开窗函数的 OVER 关键字后括号中的可以使用 PARTITION BY 子句来定义行的分区来供进行聚合计算。与 GROUP BY 子句不同,PARTITION BY 子句创建的分区是独立于结果集的,创建的分区只是供进行聚合计算的,而且不同的开窗函数所创建的分区也不互相影响。
下面的 SQL 语句用于显示按照班级分组后每组的人数:
OVER(PARTITION BY class)表示对结果集按照 class 进行分区,并且计算当前行所属的组的聚合计算结果。
spark.sql("select name, class, score, count(name) over(partition by class) name_count from scores").show
查询结果如下所示:
+----+-----+-----+----------+ |name|class|score|name_count| +----+-----+-----+----------+ | a1| 1| 80| 3| | a2| 1| 78| 3| | a3| 1| 95| 3| | a6| 3| 99| 6| | a7| 3| 99| 6| | a8| 3| 45| 6| | a9| 3| 55| 6| | a10| 3| 78| 6| | a11| 3| 100| 6| | a4| 2| 74| 2| | a5| 2| 92| 2| +----+-----+-----+----------+
row_number() over(order by score) as rownum 表示按score 升序的方式来排序,并得出排序结果的序号
注意:
在排序开窗函数中使用 PARTITION BY 子句需要放置在ORDER BY 子句之前。
spark.sql("select name, class, score, row_number() over(order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a8| 3| 45| 1| | a9| 3| 55| 2| | a4| 2| 74| 3| | a2| 1| 78| 4| | a10| 3| 78| 5| | a1| 1| 80| 6| | a5| 2| 92| 7| | a3| 1| 95| 8| | a6| 3| 99| 9| | a7| 3| 99| 10| | a11| 3| 100| 11| +----+-----+-----+----+
spark.sql("select name, class, score, row_number() over(partition by class order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a2| 1| 78| 1| | a1| 1| 80| 2| | a3| 1| 95| 3| | a8| 3| 45| 1| | a9| 3| 55| 2| | a10| 3| 78| 3| | a6| 3| 99| 4| | a7| 3| 99| 5| | a11| 3| 100| 6| | a4| 2| 74| 1| | a5| 2| 92| 2| +----+-----+-----+----+
rank() over(order by score) as rank表示按 score升序的方式来排序,并得出排序结果的排名号。
这个函数求出来的排名结果可以并列(并列第一/并列第二),并列排名之后的排名将是并列的排名加上并列数
简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第三名,也就是没有了第二名,但是有两个第一名。
spark.sql("select name, class, score, rank() over(order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a8| 3| 45| 1| | a9| 3| 55| 2| | a4| 2| 74| 3| | a10| 3| 78| 4| | a2| 1| 78| 4| | a1| 1| 80| 6| | a5| 2| 92| 7| | a3| 1| 95| 8| | a6| 3| 99| 9| | a7| 3| 99| 9| | a11| 3| 100| 11| +----+-----+-----+----+
spark.sql("select name, class, score, rank() over(partition by class order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a2| 1| 78| 1| | a1| 1| 80| 2| | a3| 1| 95| 3| | a8| 3| 45| 1| | a9| 3| 55| 2| | a10| 3| 78| 3| | a6| 3| 99| 4| | a7| 3| 99| 4| | a11| 3| 100| 6| | a4| 2| 74| 1| | a5| 2| 92| 2| +----+-----+-----+----+
dense_rank() over(order by score) as dense_rank 表示按score 升序的方式来排序,并得出排序结果的排名号。
这个函数并列排名之后的排名是并列排名加1
简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第二名,也就是两个第一名,一个第二名
spark.sql("select name, class, score, dense_rank() over(order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a8| 3| 45| 1| | a9| 3| 55| 2| | a4| 2| 74| 3| | a2| 1| 78| 4| | a10| 3| 78| 4| | a1| 1| 80| 5| | a5| 2| 92| 6| | a3| 1| 95| 7| | a6| 3| 99| 8| | a7| 3| 99| 8| | a11| 3| 100| 9| +----+-----+-----+----+
spark.sql("select name, class, score, dense_rank() over(partition by class order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a2| 1| 78| 1| | a1| 1| 80| 2| | a3| 1| 95| 3| | a8| 3| 45| 1| | a9| 3| 55| 2| | a10| 3| 78| 3| | a6| 3| 99| 4| | a7| 3| 99| 4| | a11| 3| 100| 5| | a4| 2| 74| 1| | a5| 2| 92| 2| +----+-----+-----+----+
ntile(6) over(order by score)as ntile表示按 score 升序的方式来排序,然后 6 等分成 6 个组,并显示所在组的序号。
spark.sql("select name, class, score, ntile(6) over(order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a8| 3| 45| 1| | a9| 3| 55| 1| | a4| 2| 74| 2| | a2| 1| 78| 2| | a10| 3| 78| 3| | a1| 1| 80| 3| | a5| 2| 92| 4| | a3| 1| 95| 4| | a6| 3| 99| 5| | a7| 3| 99| 5| | a11| 3| 100| 6| +----+-----+-----+----+
spark.sql("select name, class, score, ntile(6) over(partition by class order by score) rank from scores").show() +----+-----+-----+----+ |name|class|score|rank| +----+-----+-----+----+ | a2| 1| 78| 1| | a1| 1| 80| 2| | a3| 1| 95| 3| | a8| 3| 45| 1| | a9| 3| 55| 2| | a10| 3| 78| 3| | a6| 3| 99| 4| | a7| 3| 99| 5| | a11| 3| 100| 6| | a4| 2| 74| 1| | a5| 2| 92| 2| +----+-----+-----+----+
本次的分享就到这里,受益的朋友或对大数据技术感兴趣的伙伴可以点个赞关注一下博主,后续会持续更新大数据的相关内容,敬请期待(✪ω✪)