PySpark UDF on complex Data types
在处理系统日志或任何其他半结构化数据时,我们遇到了具有许多嵌套字段和嵌入式结构数组的数据。
我们要选择的第一个也是最简单的解决方案是展开字段,然后执行数据转换。如果您需要平面模式,这种方法并没有错,但为了保持模式完整,我们需要对嵌套字段应用转换。
一种方法是将 Dataframe 转换为 RDD 并使用低级 API 来转换 Dataframe。假设我们想使用 Spark SQL API 以方便使用
为了克服这个问题,我们可以使用 PySpark UDF,它可以将复杂字段作为参数并返回新字段。
让我们创建一个足够复杂的示例数据,以便为我们的用例处理。
从 pyspark.sql 导入 SparkSession 从 pyspark.sql.types 导入 * 从 pyspark.sql 导入行 从 pyspark.sql.functions 导入 udf, col data = [(["James","","Smith","36636","M",3000, [{'dept':'HR','allocation':0.4},{'dept':'FIN ','分配':0.6}]], ["Michael","Rose","","40288","M",4000,[{'dept':'HR','allocation':0.4},{'dept':'FIN','allocation ':0.6}]], ["罗伯特","","威廉姆斯","42114","M",4000,[{'dept':'HR','allocation':0.9},{'dept':'FIN','allocation ':0.1}]], ["Maria","Anne","Jones","39192","F",4000,[{'dept':'HR','allocation':0.75},{'dept':'FIN','分配':0.25}]], ["Jen","Mary","Brown","","F",-1,[{'dept':'HR','allocation':0.30},{'dept':'FIN','分配':0.70}]]) ] 架构 =( ArrayType(StructType([ StructField("名字",StringType(),True), StructField("中间名",StringType(),True), StructField("姓氏",StringType(),True), StructField("id", StringType(), True), StructField("性别", StringType(), True), StructField("salary", IntegerType(), True), StructField("隶属关系", ArrayType(StructType([StructField('dept', StringType()), StructField('allocation', FloatType())] ) ), 真的) ]) )) spark = SparkSession.builder.appName('test_udf').getOrCreate() df = spark.createDataFrame(data= data , schema=schema)
这只是一行数据,其中包含存储在 Array of Struct 中的许多员工的详细信息。
问题陈述:我们要添加 经理 内部结构中的字段 隶属关系 并保持 Dataframe 的结构完整,因此不希望通过爆炸来改变粒度。
Dataframe 的当前架构:
根 |-- 值:数组(可为空=真) | |-- 元素:结构(containsNull = true) | | |-- 名字:字符串(可为空 = true) | | |-- 中间名:字符串(可为空=真) | | |-- 姓氏:字符串(可为空 = true) | | |-- id: 字符串(可为空=真) | | |-- 性别:字符串(可为空=真) | | |-- 工资:整数(可为空 = true) | | |-- 隶属关系:数组(可为空 = true) | | | |-- 元素:结构(containsNull = true) | | | | |-- 部门:字符串(可为空=真) | | | | |-- 分配:浮动(可为空=真)
目标模式:在结构数组中添加新字段
根 |-- 值:数组(可为空=真) | |-- 元素:结构(containsNull = true) | | |-- 名字:字符串(可为空 = true) | | |-- 中间名:字符串(可为空=真) | | |-- 姓氏:字符串(可为空 = true) | | |-- id: 字符串(可为空=真) | | |-- 性别:字符串(可为空=真) | | |-- 工资:整数(可为空 = true) | | |-- 隶属关系:数组(可为空 = true) | | | |-- 元素:结构(containsNull = true) | | | | |-- 部门:字符串(可为空=真) | | | | |-- 分配:浮动(可为空=真) | | | | |-- 经理:字符串(可为空=真)
为了嵌入新字段,我们将编写一个 UDF,它将未分解的字段作为参数并返回一个新字段 经理 嵌入到我们想要的级别,即隶属关系数组。
定义 return_schema 在数组中有新字段 隶属关系:
return_schema = (ArrayType(StructType([ StructField("名字", StringType(), True), StructField("中间名", StringType(), True), StructField("姓氏", StringType(), True), StructField("id", StringType(), True), StructField("性别", StringType(), True), StructField("salary", IntegerType(), True), StructField("隶属关系", ArrayType(StructType([StructField('dept', StringType()), StructField('分配', FloatType()), StructField('manager', StringType())] ) ), 真的) ]) ))
下一步是编写带有对象的 UDF 图式 并返回一个新对象 return_schema。
@udf(returnType=return_schema) def add_manager(p): 行 = [] 对于 p 中的 ele: inner_rows = [] 对于 ele.affiliations 中的 aff_ele: inner_rows.append(Row(dept=aff_ele.dept, clubs=aff_ele.allocation,manager='Mr. X')) # 例如:可以通过 API 调用或映射 DS 来拉取新字段 行.追加( 行(名字=ele.firstname,中间名=ele.middlename,姓氏=ele.lastname,id=ele.id,性别=ele.gender, 薪水=ele.salary,隶属关系=inner_rows)) 返回行
最后一件事是在 select 语句中调用 udf:
df.select(add_manager(col("value"))).show(truncate=False)
Dataframe 中的每条记录都作为 Row 对象传递到 UDF。我们要修改数组的模式 隶属关系 但 不可变 Row[] 的行为不允许修改。因此,我们为每条记录创建一个新的 Row 对象,并返回一个嵌套的 Row 对象,该对象与 return_schema .
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明
本文链接:https://www.qanswer.top/1346/52362817