对于在开发过程中可能出现的数据倾斜问题,可提供一种利用双重group by的方法来解决。
分析:
可以使用类似于SparkCore中解决数据倾斜,提高的两阶段聚合(局部+全局) 局部——随机打散+前缀,通过groupBy完成局部统计 全局——去掉前缀,通过groupBy完成全局统计
object _05SparkSQLOptimizationOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) val conf = new SparkConf().setMaster("local[2]").setAppName(s"${_05SparkSQLOptimizationOps.getClass.getSimpleName}") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //注册自定义的函数 sqlContext.udf.register[String, String, Int]("addRandomPrefix", (field, num) => addRandomPrefix(field, num)) sqlContext.udf.register[String, String]("removePrefix", field => removePrefix(field)) val df = sqlContext.read.text("E:/data/hello.log").toDF("line") // df.show() //sql的方式 df.registerTempTable("test") // groupByOps1(sqlContext) //1、添加前缀 sqlContext.sql("select " + "addRandomPrefix(w.word, 2) as p_word " + "from (" + "select " + "explode(split(line, ' ')) as word " + "from test" + ") w").show() //2、局部统计 sqlContext.sql("select " + "p.p_word," + "count(p.p_word) as p_count " + "from (" + "select " + "addRandomPrefix(w.word, 2) as p_word " + "from (" + "select " + "explode(split(line, ' ')) as word " + "from test" + ") w" + ") p " + "group by p.p_word").show() //3、干掉前缀 sqlContext.sql("select " + "removePrefix(p.p_word) as r_word," + "count(p.p_word) as r_count " + "from (" + "select " + "addRandomPrefix(w.word, 2) as p_word " + "from (" + "select " + "explode(split(line, ' ')) as word " + "from test" + ") w" + ") p " + "group by p.p_word").show() //4、全局统计 sqlContext.sql("select " + "r.r_word as field, " + "sum(r.r_count) as sum " + "from (" + "select " + "removePrefix(p.p_word) as r_word," + "count(p.p_word) as r_count " + "from (" + "select " + "addRandomPrefix(w.word, 2) as p_word " + "from (" + "select " + "explode(split(line, ' ')) as word " + "from test" + ") w" + ") p " + "group by p.p_word" + ") r " + "group by r.r_word").show() sc.stop() } private def groupByOps1(sqlContext: SQLContext) = { //拆分 sqlContext.sql("select explode(split(line, ' ')) as word from test") .registerTempTable("word_tmp") //添加前缀 sqlContext.sql("select addRandomPrefix(word, 2) as p_word from word_tmp") .registerTempTable("prefix_word_tmp") //局部聚合 sqlContext.sql("select p_word, count(p_word) as p_count from prefix_word_tmp group by p_word") .registerTempTable("prefix_count_word_tmp") //去掉前缀 sqlContext.sql("select removePrefix(p_word) as r_word, p_count as r_count from prefix_count_word_tmp") .registerTempTable("r_prefix_count_word_tmp") //全局聚合 sqlContext.sql("select r_word, sum(r_count) r_sum from r_prefix_count_word_tmp group by r_word").show() } /** * 添加随机前缀 * * @param field * @param num [0, num) * @return num_field */ def addRandomPrefix(field:String, num:Int):String = { val random = new Random() val prefix = random.nextInt(num) prefix + "_" + field } /** * 去掉随机前缀 * @param field * @return */ def removePrefix(field:String):String = field.split("_")(1) }