Javascript

Spark SQL中将 DataFrame 转为 json 格式

本文主要是介绍Spark SQL中将 DataFrame 转为 json 格式,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

今天主要介绍一下如何将 Spark dataframe 的数据转成 json 数据。用到的是 scala 提供的 json 处理的 api。

用过 Spark SQL 应该知道,Spark dataframe 本身有提供一个 api 可以供我们将数据转成一个 JsonArray,我们可以在 spark-shell 里头举个栗子来看一下。

def main(args: Array[String]): Unit = {
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .master("local[*]").appName("test")
      .getOrCreate();
    //提供隐式转换功能,比如将 Rdd 转为 dataframe
    import spark.implicits._

    val df:DataFrame = spark.sparkContext.parallelize(Array(("abc",2),("efg",4))).toDF()
    df.show()
    /*-------------show -----------
    +---+---+
    | _1| _2|
    +---+---+
    |abc|  2|
    |efg|  4|
    +---+---+
    */

    //这里使用 dataframe Api 转换成 jsonArray
    val jsonStr:String = df.toJSON.collectAsList.toString
    println(jsonStr)
    /*--------------- json String-------------
    [{"_1":"abc","_2":2}, {"_1":"efg","_2":4}]
    */
  }

可以发现,我们可以使用 dataframe 提供的 api 直接将 dataframe 转换成 jsonArray 的形式,但这样子却有些冗余。以上面的例子来说,很多时候我要的不是这样的形式。

[{"_1":"abc","_2":2}, {"_1":"efg","_2":4}]

而是下面这种形式。

[{"abc":2}, {"efg":4}]

这才是我们通常会使用到的 json 格式。以 dataframe 的 api 转换而成的 json 明显太过冗余。为此,我们需要借助一些 json 处理的包,本着能懒则懒的原则,直接使用 scala 提供的 json 处理包。

import org.apache.spark.sql.DataFrame
import scala.util.parsing.json.{JSONArray, JSONObject}
object DFTest {

  def main(args: Array[String]): Unit = {
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .master("local[*]").appName("test")
      .getOrCreate();
    //提供隐式转换功能,比如将 Rdd 转为 dataframe
    import spark.implicits._

    val df:DataFrame = spark.sparkContext.parallelize(Array(("abc",2),("efg",4))).toDF()
    df.show()
    /*-------------show -----------
    +---+---+
    | _1| _2|
    +---+---+
    |abc|  2|
    |efg|  4|
    +---+---+
    */

    //接下来不一样了
    val df2Array:Array[Tuple2[String,Int]] = df.collect().map{
      case org.apache.spark.sql.Row(x:String,y:Int) => (x,y)}

    val jsonData:Array[JSONObject] = df2Array.map{ i =>
      new JSONObject(Map(i._1 -> i._2))
    }

    val jsonArray:JSONArray = new JSONArray(jsonData.toList)

    println(jsonArray)
    /*-----------jsonArray------------
    [{"abc" : 2}, {"efg" : 4}]
    */
  }
}

大概说明一下上述的代码,首先我们要先将 df 变量进行 collect 操作,将它转换成 Array ,但是要生成 jsonObject 得是 Array[Tuple2[T,T]] 的格式,所以我们需要再进一步转换成对应格式。这里的 map 是函数式编程里面的 map 。

然后也是用 map 操作生成 Array[JSONObject],最后再转换成 JSONArray 就可以。

将数据转换成 json 的格式通常不能太大,一般用在 spark 跑出数据结果后写入到其他数据库的时候会用到,比如 Mysql 。

这篇关于Spark SQL中将 DataFrame 转为 json 格式的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!